Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

支持ClickHouse上线 #1403

Merged
merged 1 commit into from
Mar 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ Archery是[archer](https://github.com/jly8866/archer)的分支项目,定位于
| MongoDB | √ | √ | √ | × | × | × | × | × | × | × |
| Phoenix | √ | × | √ | × | × | × | × | × | × | × |
| ODPS | √ | × | × | × | × | × | × | × | × | × |
| ClickHouse | √ | × | × | × | × | × | × | × | × | × |
| ClickHouse | √ | | | × | × | × | × | × | × | × |



Expand Down
238 changes: 236 additions & 2 deletions sql/engines/clickhouse.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
# -*- coding: UTF-8 -*-
from clickhouse_driver import connect
from sql.utils.sql_utils import get_syntax_type
from .models import ResultSet, ReviewResult, ReviewSet
from common.utils.timer import FuncTimer
from common.config import SysConfig
from . import EngineBase
from .models import ResultSet
import sqlparse
import logging
import re
Expand All @@ -13,6 +16,7 @@ class ClickHouseEngine(EngineBase):

def __init__(self, instance=None):
super(ClickHouseEngine, self).__init__(instance=instance)
self.config = SysConfig()

def get_connection(self, db_name=None):
if self.conn:
Expand Down Expand Up @@ -45,12 +49,25 @@ def server_version(self):
version = result.rows[0][0].split(' ')[1]
return tuple([int(n) for n in version.split('.')[:3]])

def get_table_engine(self, tb_name):
"""获取某个table的engine type"""
sql = f"""select engine
from system.tables
where database='{tb_name.split('.')[0]}'
and name='{tb_name.split('.')[1]}'"""
query_result = self.query(sql=sql)
if query_result.rows:
result = {'status': 1, 'engine': query_result.rows[0][0]}
else:
result = {'status': 0, 'engine': 'None'}
return result

def get_all_databases(self):
"""获取数据库列表, 返回一个ResultSet"""
sql = "show databases"
result = self.query(sql=sql)
db_list = [row[0] for row in result.rows
if row[0] not in ('system','INFORMATION_SCHEMA', 'information_schema', 'datasets')]
if row[0] not in ('system', 'INFORMATION_SCHEMA', 'information_schema', 'datasets')]
result.rows = db_list
return result

Expand Down Expand Up @@ -170,6 +187,223 @@ def filter_sql(self, sql='', limit_num=0):
sql = f'{sql};'
return sql

def explain_check(self, check_result, db_name=None, line=0, statement=''):
"""使用explain ast检查sql语法, 返回Review set"""
result = ReviewResult(id=line, errlevel=0,
stagestatus='Audit completed',
errormessage='None',
sql=statement,
affected_rows=0,
execute_time=0, )
# clickhouse版本>=20.6.3才使用explain检查
if self.server_version >= (20, 6, 3):
explain_result = self.query(db_name=db_name, sql=f"explain ast {statement}")
if explain_result.error:
check_result.is_critical = True
result = ReviewResult(id=line, errlevel=2,
stagestatus='驳回未通过检查SQL',
errormessage=f'explain语法检查错误:{explain_result.error}',
sql=statement)
return result

def execute_check(self, db_name=None, sql=''):
"""上线单执行前的检查, 返回Review set"""
sql = sqlparse.format(sql, strip_comments=True)
sql_list = sqlparse.split(sql)

# 禁用/高危语句检查
check_result = ReviewSet(full_sql=sql)
line = 1
critical_ddl_regex = self.config.get('critical_ddl_regex', '')
p = re.compile(critical_ddl_regex)
check_result.syntax_type = 2 # TODO 工单类型 0、其他 1、DDL,2、DML

for statement in sql_list:
statement = statement.rstrip(';')
# 禁用语句
if re.match(r"^select|^show", statement.lower()):
check_result.is_critical = True
result = ReviewResult(id=line, errlevel=2,
stagestatus='驳回不支持语句',
errormessage='仅支持DML和DDL语句,查询语句请使用SQL查询功能!',
sql=statement)
# 高危语句
elif critical_ddl_regex and p.match(statement.strip().lower()):
check_result.is_critical = True
result = ReviewResult(id=line, errlevel=2,
stagestatus='驳回高危SQL',
errormessage='禁止提交匹配' + critical_ddl_regex + '条件的语句!',
sql=statement)
# alter语句
elif re.match(r"^alter", statement.lower()):
# alter table语句
if re.match(r"^alter\s+table\s+(.+?)\s+", statement.lower()):
table_name = re.match(r"^alter\s+table\s+(.+?)\s+", statement.lower(), re.M).group(1)
if '.' not in table_name:
table_name = f"{db_name}.{table_name}"
table_engine = self.get_table_engine(table_name)['engine']
table_exist = self.get_table_engine(table_name)['status']
if table_exist == 1:
if not table_engine.endswith('MergeTree') and table_engine not in ('Merge', 'Distributed'):
check_result.is_critical = True
result = ReviewResult(id=line, errlevel=2,
stagestatus='驳回不支持SQL',
errormessage='ALTER TABLE仅支持*MergeTree,Merge以及Distributed等引擎表!',
sql=statement)
else:
# delete与update语句,实际是alter语句的变种
if re.match(r"^alter\s+table\s+(.+?)\s+(delete|update)\s+", statement.lower()):
if not table_engine.endswith('MergeTree'):
check_result.is_critical = True
result = ReviewResult(id=line, errlevel=2,
stagestatus='驳回不支持SQL',
errormessage='DELETE与UPDATE仅支持*MergeTree引擎表!',
sql=statement)
else:
result = self.explain_check(check_result, db_name, line, statement)
else:
result = self.explain_check(check_result, db_name, line, statement)
else:
check_result.is_critical = True
result = ReviewResult(id=line, errlevel=2,
stagestatus='表不存在',
errormessage=f'表 {table_name} 不存在!',
sql=statement)
# 其他alter语句
else:
result = self.explain_check(check_result, db_name, line, statement)
# truncate语句
elif re.match(r"^truncate\s+table\s+(.+?)(\s|$)", statement.lower()):
table_name = re.match(r"^truncate\s+table\s+(.+?)(\s|$)", statement.lower(), re.M).group(1)
if '.' not in table_name:
table_name = f"{db_name}.{table_name}"
table_engine = self.get_table_engine(table_name)['engine']
table_exist = self.get_table_engine(table_name)['status']
if table_exist == 1:
if table_engine in ('View', 'File,', 'URL', 'Buffer', 'Null'):
check_result.is_critical = True
result = ReviewResult(id=line, errlevel=2,
stagestatus='驳回不支持SQL',
errormessage='TRUNCATE不支持View,File,URL,Buffer和Null表引擎!',
sql=statement)
else:
result = self.explain_check(check_result, db_name, line, statement)
else:
check_result.is_critical = True
result = ReviewResult(id=line, errlevel=2,
stagestatus='表不存在',
errormessage=f'表 {table_name} 不存在!',
sql=statement)
# insert语句,explain无法正确判断,暂时只做表存在性检查与简单关键字匹配
elif re.match(r"^insert", statement.lower()):
if re.match(r"^insert\s+into\s+(.+?)(\s+|\s*\(.+?)(values|format|select)(\s+|\()", statement.lower()):
table_name = re.match(r"^insert\s+into\s+(.+?)(\s+|\s*\(.+?)(values|format|select)(\s+|\()",
statement.lower(), re.M).group(1)
if '.' not in table_name:
table_name = f"{db_name}.{table_name}"
table_exist = self.get_table_engine(table_name)['status']
if table_exist == 1:
result = ReviewResult(id=line, errlevel=0,
stagestatus='Audit completed',
errormessage='None',
sql=statement,
affected_rows=0,
execute_time=0, )
else:
check_result.is_critical = True
result = ReviewResult(id=line, errlevel=2,
stagestatus='表不存在',
errormessage=f'表 {table_name} 不存在!',
sql=statement)
else:
check_result.is_critical = True
result = ReviewResult(id=line, errlevel=2,
stagestatus='驳回不支持SQL',
errormessage='INSERT语法不正确!',
sql=statement)
# 其他语句使用explain ast简单检查
else:
result = self.explain_check(check_result, db_name, line, statement)

# 没有找出DDL语句的才继续执行此判断
if check_result.syntax_type == 2:
if get_syntax_type(statement, parser=False, db_type='mysql') == 'DDL':
check_result.syntax_type = 1
check_result.rows += [result]

# 遇到禁用和高危语句直接返回
if check_result.is_critical:
check_result.error_count += 1
return check_result
line += 1
return check_result

def execute_workflow(self, workflow):
"""执行上线单,返回Review set"""
sql = workflow.sqlworkflowcontent.sql_content
execute_result = ReviewSet(full_sql=sql)
sqls = sqlparse.format(sql, strip_comments=True)
sql_list = sqlparse.split(sqls)

line = 1
for statement in sql_list:
with FuncTimer() as t:
result = self.execute(db_name=workflow.db_name, sql=statement, close_conn=True)
if not result.error:
execute_result.rows.append(ReviewResult(
id=line,
errlevel=0,
stagestatus='Execute Successfully',
errormessage='None',
sql=statement,
affected_rows=0,
execute_time=t.cost,
))
line += 1
else:
# 追加当前报错语句信息到执行结果中
execute_result.error = result.error
execute_result.rows.append(ReviewResult(
id=line,
errlevel=2,
stagestatus='Execute Failed',
errormessage=f'异常信息:{result.error}',
sql=statement,
affected_rows=0,
execute_time=0,
))
line += 1
# 报错语句后面的语句标记为审核通过、未执行,追加到执行结果中
for statement in sql_list[line - 1:]:
execute_result.rows.append(ReviewResult(
id=line,
errlevel=0,
stagestatus='Audit completed',
errormessage=f'前序语句失败, 未执行',
sql=statement,
affected_rows=0,
execute_time=0,
))
line += 1
break
return execute_result

def execute(self, db_name=None, sql='', close_conn=True):
"""原生执行语句"""
result = ResultSet(full_sql=sql)
conn = self.get_connection(db_name=db_name)
try:
cursor = conn.cursor()
for statement in sqlparse.split(sql):
cursor.execute(statement)
cursor.close()
except Exception as e:
logger.warning(f"ClickHouse语句执行报错,语句:{sql},错误信息{e}")
result.error = str(e).split('Stack trace')[0]
if close_conn:
self.close()
return result

def close(self):
if self.conn:
self.conn.close()
Expand Down
Loading