Skip to content

Commit

Permalink
[fix](dbt)fix dbt incremental apache#16840
Browse files Browse the repository at this point in the history
fix dbt incremental :new ideas for no rollback and support incremental data rerun .
add snapshot
use 'mysql-connector-python' mysql driver to replace 'MysqlDb' driver
  • Loading branch information
catpineapple authored Feb 18, 2023
1 parent 861e4bc commit 45dbd4d
Show file tree
Hide file tree
Showing 29 changed files with 653 additions and 2,098 deletions.
29 changes: 29 additions & 0 deletions extension/dbt-doris/.flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[flake8]
select =
E
W
F
ignore =
W503 # makes Flake8 work like black
W504
E203 # makes Flake8 work like black
E741
E501
exclude = tests
72 changes: 72 additions & 0 deletions extension/dbt-doris/.pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# For more on configuring pre-commit hooks (see https://pre-commit.com/)

# TODO: remove global exclusion of tests when testing overhaul is complete
exclude: "^tests/.*"


default_language_version:
python: python3.8

repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v3.2.0
hooks:
- id: check-yaml
args: [--unsafe]
- id: check-json
- id: end-of-file-fixer
- id: trailing-whitespace
- id: check-case-conflict
- repo: https://github.com/psf/black
rev: 22.3.0
hooks:
- id: black
args:
- "--line-length=99"
- "--target-version=py38"
- id: black
alias: black-check
stages: [manual]
args:
- "--line-length=99"
- "--target-version=py38"
- "--check"
- "--diff"
- repo: https://gitlab.com/pycqa/flake8
rev: 4.0.1
hooks:
- id: flake8
- id: flake8
alias: flake8-check
stages: [manual]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.782
hooks:
- id: mypy
args: [--show-error-codes, --ignore-missing-imports]
files: ^dbt/adapters/.*
language: system
- id: mypy
alias: mypy-check
stages: [manual]
args: [--show-error-codes, --pretty, --ignore-missing-imports]
files: ^dbt/adapters
language: system

21 changes: 21 additions & 0 deletions extension/dbt-doris/MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/usr/bin/env python
# encoding: utf-8

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

recursive-include dbt/include *.sql *.yml *.md
11 changes: 7 additions & 4 deletions extension/dbt-doris/dbt/adapters/doris/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
# specific language governing permissions and limitations
# under the License.

from dbt.adapters.base import AdapterPlugin
from dbt.adapters.doris.connections import DorisAdapterCredentials
from dbt.adapters.doris.connections import DorisConnectionManager # noqa
from dbt.adapters.doris.connections import DorisCredentials
from dbt.adapters.doris.impl import DorisAdapter

from dbt.adapters.base import AdapterPlugin
from dbt.include import doris


Plugin = AdapterPlugin(
adapter=DorisAdapter,
credentials=DorisAdapterCredentials,
credentials=DorisCredentials,
include_path=doris.PACKAGE_PATH,
)
)
2 changes: 1 addition & 1 deletion extension/dbt-doris/dbt/adapters/doris/__version__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@
# this 'version' must be set !!!
# otherwise the adapters will not be found after the 'dbt init xxx' command

version = "0.1.0"
version = "1.3.0"
73 changes: 50 additions & 23 deletions extension/dbt-doris/dbt/adapters/doris/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
from dataclasses import dataclass
from typing import ContextManager, Optional, Union

import MySQLdb
import MySQLdb.cursors
import mysql.connector

from dbt import exceptions
from dbt.adapters.base import Credentials
from dbt.adapters.sql import SQLConnectionManager
Expand All @@ -34,14 +34,15 @@


@dataclass
class DorisAdapterCredentials(Credentials):
class DorisCredentials(Credentials):
host: str = "127.0.0.1"
port: int = 9030
username: str = "root"
password: str = ""
database: Optional[str] = None
schema: Optional[str] = None


@property
def type(self):
return "doris"
Expand All @@ -51,7 +52,7 @@ def _connection_keys(self):

@property
def unique_field(self) -> str:
return self.host
return self.schema

def __post_init__(self):
if self.database is not None and self.database != self.schema:
Expand All @@ -63,49 +64,73 @@ def __post_init__(self):
)


class DorisAdapterConnectionManager(SQLConnectionManager):
class DorisConnectionManager(SQLConnectionManager):
TYPE = "doris"

@classmethod
def open(cls, connection: Connection) -> Connection:
if connection.state == "open":
logger.debug("Connection is already open, skipping open")
return connection
credentials = connection.credentials
credentials = cls.get_credentials(connection.credentials)
kwargs = {
"host": credentials.host,
"port": credentials.port,
"user": credentials.username,
"password": credentials.password,
"buffered": True,
"charset": "utf8",
"get_warnings": True,
}

try:
connection.handle = MySQLdb.connect(**kwargs)
connection.state = ConnectionState.OPEN
except MySQLdb.Error as e:
logger.debug(f"Error connecting to database: {e}")
connection.handle = None
connection.state = ConnectionState.FAIL
raise exceptions.FailedToConnectException(str(e))
connection.handle = mysql.connector.connect(**kwargs)
connection.state = 'open'
except mysql.connector.Error:

try:
logger.debug("Failed connection without supplying the `database`. "
"Trying again with `database` included.")
connection.handle = mysql.connector.connect(**kwargs)
connection.state = 'open'
except mysql.connector.Error as e:

logger.debug("Got an error when attempting to open a mysql "
"connection: '{}'"
.format(e))

connection.handle = None
connection.state = 'fail'

raise dbt.exceptions.FailedToConnectException(str(e))
return connection

@classmethod
def get_credentials(cls, credentials):
return credentials

@classmethod
def cancel(self, connection: Connection):
connection.handle.close()

@classmethod
def get_response(cls, cursor: MySQLdb.cursors.Cursor) -> Union[AdapterResponse, str]:
code = "Unknown cursor state/status"
rows = cursor.rowcount
def get_response(cls, cursor) -> Union[AdapterResponse, str]:
code = "SUCCESS"
num_rows = 0

if cursor is not None and cursor.rowcount is not None:
num_rows = cursor.rowcount
return AdapterResponse(
code=code,
_message=f"{rows} rows affected",
rows_affected=rows,
_message=f"{num_rows} rows affected",
rows_affected=num_rows,
)

@contextmanager # type: ignore
def exception_handler(self, sql: str) -> ContextManager: # type: ignore
@contextmanager
def exception_handler(self, sql: str) -> ContextManager:
try:
yield
except MySQLdb.DatabaseError as e:
except mysql.connector.DatabaseError as e:
logger.debug(f"Doris database error: {e}, sql: {sql}")
raise exceptions.DatabaseException(str(e)) from e
except Exception as e:
Expand All @@ -114,12 +139,14 @@ def exception_handler(self, sql: str) -> ContextManager: # type: ignore
raise e
raise exceptions.RuntimeException(str(e)) from e

@classmethod
def begin(self):
"""
https://doris.apache.org/zh-CN/sql-reference/sql-statements/Data%20Manipulation/BEGIN.html#description
Doris only support transaction in insert, just ignore it
https://doris.apache.org/docs/data-operate/import/import-scenes/load-atomicity/
Doris's inserting always transaction, ignore it
"""
pass

@classmethod
def commit(self):
pass
32 changes: 23 additions & 9 deletions extension/dbt-doris/dbt/adapters/doris/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@
# specific language governing permissions and limitations
# under the License.

from dbt.adapters.sql import SQLAdapter

from concurrent.futures import Future
from enum import Enum
from typing import Callable, Dict, List, Optional, Set, Tuple

import agate
import dbt.exceptions
from dbt.adapters.base.impl import _expect_row_value, catch_as_completed
from dbt.adapters.base.relation import InformationSchema
from dbt.adapters.base.relation import InformationSchema, BaseRelation
from dbt.adapters.doris.column import DorisColumn
from dbt.adapters.doris.connections import DorisAdapterConnectionManager
from dbt.adapters.doris.connections import DorisConnectionManager
from dbt.adapters.doris.relation import DorisRelation
from dbt.adapters.protocol import AdapterConfig
from dbt.adapters.sql import SQLAdapter
from dbt.adapters.sql.impl import LIST_RELATIONS_MACRO_NAME, LIST_SCHEMAS_MACRO_NAME
from dbt.clients.agate_helper import table_from_rows
from dbt.contracts.graph.manifest import Manifest
Expand Down Expand Up @@ -63,7 +64,7 @@ class DorisConfig(AdapterConfig):


class DorisAdapter(SQLAdapter):
ConnectionManager = DorisAdapterConnectionManager
ConnectionManager = DorisConnectionManager
Relation = DorisRelation
AdapterSpecificConfigs = DorisConfig
Column = DorisColumn
Expand Down Expand Up @@ -95,6 +96,15 @@ def get_relation(self, database: Optional[str], schema: str, identifier: str):

return super().get_relation(database, schema, identifier)

def drop_schema(self, relation: BaseRelation):
relations = self.list_relations(
database=relation.database,
schema=relation.schema
)
for relation in relations:
self.drop_relation(relation)
super().drop_schema(relation)

def list_relations_without_caching(self, schema_relation: DorisRelation) -> List[DorisRelation]:
kwargs = {"schema_relation": schema_relation}
results = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs)
Expand All @@ -120,11 +130,7 @@ def list_relations_without_caching(self, schema_relation: DorisRelation) -> List

def get_catalog(self, manifest):
schema_map = self._get_catalog_schemas(manifest)
if len(schema_map) > 1:
dbt.exceptions.raise_compiler_error(
f"Expected only one database in get_catalog, found " f"{list(schema_map)}"
)


with executor(self.config) as tpe:
futures: List[Future[agate.Table]] = []
for info, schemas in schema_map.items():
Expand Down Expand Up @@ -176,3 +182,11 @@ def _get_one_catalog(
)

return super()._get_one_catalog(information_schema, schemas, manifest)

# Methods used in adapter tests
def timestamp_add_sql(self, add_to: str, number: int = 1, interval: str = "hour") -> str:
# for backwards compatibility, we're compelled to set some sort of
# default. A lot of searching has lead me to believe that the
# '+ interval' syntax used in postgres/redshift is relatively common
# and might even be the SQL standard's intention.
return f"{add_to} + interval {number} {interval}"
2 changes: 1 addition & 1 deletion extension/dbt-doris/dbt/include/doris/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
# under the License.

name: dbt_doris
version: 1.0
version: 1.3.0
config-version: 2

macro-paths: ["macros"]
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,7 @@ where table_schema = '{{ relation.schema }}'
{% set table = load_result('get_columns_in_relation').table %}
{{ return(sql_convert_columns_in_relation(table)) }}
{%- endmacro %}

{% macro doris__alter_column_type(relation,column_name,new_column_type) -%}
'''Changes column name or data type'''
{% endmacro %}
Loading

0 comments on commit 45dbd4d

Please sign in to comment.