Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

支持ClickHouse查询 #1384

Merged
merged 3 commits into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ httptools==0.1.1
uvicorn==0.12.2
pycryptodome==3.10.1
pyodps==0.10.7.1
clickhouse-driver==0.2.3
6 changes: 6 additions & 0 deletions sql/engines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,10 @@ def get_engine(instance=None): # pragma: no cover

elif instance.db_type == 'odps':
from .odps import ODPSEngine

return ODPSEngine(instance=instance)

elif instance.db_type == 'clickhouse':
from .clickhouse import ClickHouseEngine

return ClickHouseEngine(instance=instance)
176 changes: 176 additions & 0 deletions sql/engines/clickhouse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
# -*- coding: UTF-8 -*-
from clickhouse_driver import connect
from . import EngineBase
from .models import ResultSet
import sqlparse
import logging
import re

logger = logging.getLogger('default')


class ClickHouseEngine(EngineBase):

def __init__(self, instance=None):
super(ClickHouseEngine, self).__init__(instance=instance)

def get_connection(self, db_name=None):
if self.conn:
return self.conn
if db_name:
self.conn = connect(host=self.host, port=self.port, user=self.user, password=self.password,
database=db_name, connect_timeout=10)
else:
self.conn = connect(host=self.host, port=self.port, user=self.user, password=self.password,
connect_timeout=10)
return self.conn

@property
def name(self):
return 'ClickHouse'

@property
def info(self):
return 'ClickHouse engine'

@property
def auto_backup(self):
"""是否支持备份"""
return False

@property
def server_version(self):
sql = "select value from system.build_options where name = 'VERSION_FULL';"
result = self.query(sql=sql)
version = result.rows[0][0].split(' ')[1]
return tuple([int(n) for n in version.split('.')[:3]])

def get_all_databases(self):
"""获取数据库列表, 返回一个ResultSet"""
sql = "show databases"
result = self.query(sql=sql)
db_list = [row[0] for row in result.rows
if row[0] not in ('system','INFORMATION_SCHEMA', 'information_schema', 'datasets')]
result.rows = db_list
return result

def get_all_tables(self, db_name, **kwargs):
"""获取table 列表, 返回一个ResultSet"""
sql = "show tables"
result = self.query(db_name=db_name, sql=sql)
tb_list = [row[0] for row in result.rows]
result.rows = tb_list
return result

def get_all_columns_by_tb(self, db_name, tb_name, **kwargs):
"""获取所有字段, 返回一个ResultSet"""
sql = f"""select
name,
type,
comment
from
system.columns
where
database = '{db_name}'
and table = '{tb_name}';"""
result = self.query(db_name=db_name, sql=sql)
column_list = [row[0] for row in result.rows]
result.rows = column_list
return result

def describe_table(self, db_name, tb_name, **kwargs):
"""return ResultSet 类似查询"""
sql = f"show create table `{tb_name}`;"
result = self.query(db_name=db_name, sql=sql)

result.rows[0] = (tb_name,) + (result.rows[0][0].replace('(', '(\n ').replace(',', ',\n '),)
return result

def query(self, db_name=None, sql='', limit_num=0, close_conn=True, **kwargs):
"""返回 ResultSet """
result_set = ResultSet(full_sql=sql)
try:
conn = self.get_connection(db_name=db_name)
cursor = conn.cursor()
cursor.execute(sql)
if int(limit_num) > 0:
rows = cursor.fetchmany(size=int(limit_num))
else:
rows = cursor.fetchall()
fields = cursor.description

result_set.column_list = [i[0] for i in fields] if fields else []
result_set.rows = rows
result_set.affected_rows = len(rows)
except Exception as e:
logger.warning(f"ClickHouse语句执行报错,语句:{sql},错误信息{e}")
result_set.error = str(e).split('Stack trace')[0]
finally:
if close_conn:
self.close()
return result_set

def query_check(self, db_name=None, sql=''):
# 查询语句的检查、注释去除、切分
result = {'msg': '', 'bad_query': False, 'filtered_sql': sql, 'has_star': False}
# 删除注释语句,进行语法判断,执行第一条有效sql
try:
sql = sqlparse.format(sql, strip_comments=True)
sql = sqlparse.split(sql)[0]
result['filtered_sql'] = sql.strip()
except IndexError:
result['bad_query'] = True
result['msg'] = '没有有效的SQL语句'
if re.match(r"^select|^show|^explain", sql, re.I) is None:
result['bad_query'] = True
result['msg'] = '不支持的查询语法类型!'
if '*' in sql:
result['has_star'] = True
result['msg'] = 'SQL语句中含有 * '
# clickhouse 20.6.3版本开始正式支持explain语法
if re.match(r"^explain", sql, re.I) and self.server_version < (20, 6, 3):
result['bad_query'] = True
result['msg'] = f"当前ClickHouse实例版本低于20.6.3,不支持explain!"
# select语句先使用Explain判断语法是否正确
if re.match(r"^select", sql, re.I) and self.server_version >= (20, 6, 3):
explain_result = self.query(db_name=db_name, sql=f"explain {sql}")
if explain_result.error:
result['bad_query'] = True
result['msg'] = explain_result.error

return result

def filter_sql(self, sql='', limit_num=0):
# 对查询sql增加limit限制,limit n 或 limit n,n 或 limit n offset n统一改写成limit n
sql = sql.rstrip(';').strip()
if re.match(r"^select", sql, re.I):
# LIMIT N
limit_n = re.compile(r'limit\s+(\d+)\s*$', re.I)
# LIMIT M OFFSET N
limit_offset = re.compile(r'limit\s+(\d+)\s+offset\s+(\d+)\s*$', re.I)
# LIMIT M,N
offset_comma_limit = re.compile(r'limit\s+(\d+)\s*,\s*(\d+)\s*$', re.I)
if limit_n.search(sql):
sql_limit = limit_n.search(sql).group(1)
limit_num = min(int(limit_num), int(sql_limit))
sql = limit_n.sub(f'limit {limit_num};', sql)
elif limit_offset.search(sql):
sql_limit = limit_offset.search(sql).group(1)
sql_offset = limit_offset.search(sql).group(2)
limit_num = min(int(limit_num), int(sql_limit))
sql = limit_offset.sub(f'limit {limit_num} offset {sql_offset};', sql)
elif offset_comma_limit.search(sql):
sql_offset = offset_comma_limit.search(sql).group(1)
sql_limit = offset_comma_limit.search(sql).group(2)
limit_num = min(int(limit_num), int(sql_limit))
sql = offset_comma_limit.sub(f'limit {sql_offset},{limit_num};', sql)
else:
sql = f'{sql} limit {limit_num};'
else:
sql = f'{sql};'
return sql

def close(self):
if self.conn:
self.conn.close()
self.conn = None
141 changes: 141 additions & 0 deletions sql/engines/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from sql.engines.pgsql import PgSQLEngine
from sql.engines.oracle import OracleEngine
from sql.engines.mongo import MongoEngine
from sql.engines.clickhouse import ClickHouseEngine
from sql.models import Instance, SqlWorkflow, SqlWorkflowContent

User = get_user_model()
Expand Down Expand Up @@ -1646,3 +1647,143 @@ def test_fill_query_columns(self):
{"_id": {"$oid": "7f10162029684728e70045ab"}, "author": "archery"}]
cols = self.engine.fill_query_columns(cursor, columns=columns)
self.assertEqual(cols, ["_id", "title", "tags", "likes", "text", "author"])


class TestClickHouse(TestCase):

def setUp(self):
self.ins1 = Instance(instance_name='some_ins', type='slave', db_type='clickhouse', host='some_host',
port=9000, user='ins_user', password='some_str')
self.ins1.save()
self.sys_config = SysConfig()

def tearDown(self):
self.ins1.delete()
self.sys_config.purge()

@patch.object(ClickHouseEngine, 'query')
def test_server_version(self, mock_query):
result = ResultSet()
result.rows = [('ClickHouse 22.1.3.7',)]
mock_query.return_value = result
new_engine = ClickHouseEngine(instance=self.ins1)
server_version = new_engine.server_version
self.assertTupleEqual(server_version, (22, 1, 3))

@patch('clickhouse_driver.connect')
def test_engine_base_info(self, _conn):
new_engine = ClickHouseEngine(instance=self.ins1)
self.assertEqual(new_engine.name, 'ClickHouse')
self.assertEqual(new_engine.info, 'ClickHouse engine')

@patch.object(ClickHouseEngine, 'get_connection')
def testGetConnection(self, connect):
new_engine = ClickHouseEngine(instance=self.ins1)
new_engine.get_connection()
connect.assert_called_once()

@patch.object(ClickHouseEngine, 'query')
def testQuery(self, mock_query):
result = ResultSet()
result.rows = [('v1', 'v2'), ]
mock_query.return_value = result
new_engine = ClickHouseEngine(instance=self.ins1)
query_result = new_engine.query(sql='some_sql', limit_num=100)
self.assertListEqual(query_result.rows, [('v1', 'v2'), ])

@patch.object(ClickHouseEngine, 'query')
def testAllDb(self, mock_query):
db_result = ResultSet()
db_result.rows = [('db_1',), ('db_2',)]
mock_query.return_value = db_result
new_engine = ClickHouseEngine(instance=self.ins1)
dbs = new_engine.get_all_databases()
self.assertEqual(dbs.rows, ['db_1', 'db_2'])

@patch.object(ClickHouseEngine, 'query')
def testAllTables(self, mock_query):
table_result = ResultSet()
table_result.rows = [('tb_1', 'some_des'), ('tb_2', 'some_des')]
mock_query.return_value = table_result
new_engine = ClickHouseEngine(instance=self.ins1)
tables = new_engine.get_all_tables('some_db')
mock_query.assert_called_once_with(db_name='some_db', sql=ANY)
self.assertEqual(tables.rows, ['tb_1', 'tb_2'])

@patch.object(ClickHouseEngine, 'query')
def testAllColumns(self, mock_query):
db_result = ResultSet()
db_result.rows = [('col_1', 'type'), ('col_2', 'type2')]
mock_query.return_value = db_result
new_engine = ClickHouseEngine(instance=self.ins1)
dbs = new_engine.get_all_columns_by_tb('some_db', 'some_tb')
self.assertEqual(dbs.rows, ['col_1', 'col_2'])

@patch.object(ClickHouseEngine, 'query')
def testDescribe(self, mock_query):
new_engine = ClickHouseEngine(instance=self.ins1)
new_engine.describe_table('some_db', 'some_db')
mock_query.assert_called_once()

def test_query_check_wrong_sql(self):
new_engine = ClickHouseEngine(instance=self.ins1)
wrong_sql = '-- 测试'
check_result = new_engine.query_check(db_name='some_db', sql=wrong_sql)
self.assertDictEqual(check_result,
{'msg': '不支持的查询语法类型!', 'bad_query': True, 'filtered_sql': '-- 测试', 'has_star': False})

def test_query_check_update_sql(self):
new_engine = ClickHouseEngine(instance=self.ins1)
update_sql = 'update user set id=0'
check_result = new_engine.query_check(db_name='some_db', sql=update_sql)
self.assertDictEqual(check_result,
{'msg': '不支持的查询语法类型!', 'bad_query': True, 'filtered_sql': 'update user set id=0',
'has_star': False})

def test_filter_sql_with_delimiter(self):
new_engine = ClickHouseEngine(instance=self.ins1)
sql_without_limit = 'select user from usertable;'
check_result = new_engine.filter_sql(sql=sql_without_limit, limit_num=100)
self.assertEqual(check_result, 'select user from usertable limit 100;')

def test_filter_sql_without_delimiter(self):
new_engine = ClickHouseEngine(instance=self.ins1)
sql_without_limit = 'select user from usertable'
check_result = new_engine.filter_sql(sql=sql_without_limit, limit_num=100)
self.assertEqual(check_result, 'select user from usertable limit 100;')

def test_filter_sql_with_limit(self):
new_engine = ClickHouseEngine(instance=self.ins1)
sql_without_limit = 'select user from usertable limit 10'
check_result = new_engine.filter_sql(sql=sql_without_limit, limit_num=1)
self.assertEqual(check_result, 'select user from usertable limit 1;')

def test_filter_sql_with_limit_min(self):
new_engine = ClickHouseEngine(instance=self.ins1)
sql_without_limit = 'select user from usertable limit 10'
check_result = new_engine.filter_sql(sql=sql_without_limit, limit_num=100)
self.assertEqual(check_result, 'select user from usertable limit 10;')

def test_filter_sql_with_limit_offset(self):
new_engine = ClickHouseEngine(instance=self.ins1)
sql_without_limit = 'select user from usertable limit 10 offset 100'
check_result = new_engine.filter_sql(sql=sql_without_limit, limit_num=1)
self.assertEqual(check_result, 'select user from usertable limit 1 offset 100;')

def test_filter_sql_with_limit_nn(self):
new_engine = ClickHouseEngine(instance=self.ins1)
sql_without_limit = 'select user from usertable limit 10, 100'
check_result = new_engine.filter_sql(sql=sql_without_limit, limit_num=1)
self.assertEqual(check_result, 'select user from usertable limit 10,1;')

def test_filter_sql_upper(self):
new_engine = ClickHouseEngine(instance=self.ins1)
sql_without_limit = 'SELECT USER FROM usertable LIMIT 10, 100'
check_result = new_engine.filter_sql(sql=sql_without_limit, limit_num=1)
self.assertEqual(check_result, 'SELECT USER FROM usertable limit 10,1;')

def test_filter_sql_not_select(self):
new_engine = ClickHouseEngine(instance=self.ins1)
sql_without_limit = 'show create table usertable;'
check_result = new_engine.filter_sql(sql=sql_without_limit, limit_num=1)
self.assertEqual(check_result, 'show create table usertable;')
1 change: 1 addition & 0 deletions sql/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class Meta:
('mongo', 'Mongo'),
('phoenix', 'Phoenix'),
('odps', 'ODPS'),
('clickhouse', 'ClickHouse'),
('goinception', 'goInception'))


Expand Down
1 change: 1 addition & 0 deletions sql/templates/instance.html
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
<option value="mongo">Mongo</option>
<option value="phoenix">Phoenix</option>
<option value="odps">ODPS</option>
<option value="clickhouse">ClickHouse</option>
</select>
</div>
<div class="form-group">
Expand Down
4 changes: 4 additions & 0 deletions sql/templates/queryapplylist.html
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ <h4 class="modal-title" id="myModalLabel">申请数据库查询权限</h4>
<optgroup id="optgroup-mongo" label="Mongo"></optgroup>
<optgroup id="optgroup-phoenix" label="Phoenix"></optgroup>
<optgroup id="optgroup-odps" label="ODPS"></optgroup>
<optgroup id="optgroup-clickhouse" label="ClickHouse"></optgroup>
</select>
</div>
<div class="form-group">
Expand Down Expand Up @@ -165,6 +166,7 @@ <h4 class="modal-title text-danger">工单日志</h4>
$("#optgroup-mongo").empty();
$("#optgroup-phoenix").empty();
$("#optgroup-odps").empty();
$("#optgroup-clickhouse").empty();
for (var i = 0; i < result.length; i++) {
var instance = "<option value=\"" + result[i]['instance_name'] + "\">" + result[i]['instance_name'] + "</option>";
if (result[i]['db_type'] === 'mysql') {
Expand All @@ -183,6 +185,8 @@ <h4 class="modal-title text-danger">工单日志</h4>
$("#optgroup-phoenix").append(instance);
} else if (result[i]['db_type'] === 'odps') {
$("#optgroup-odps").append(instance);
} else if (result[i]['db_type'] === 'clickhouse') {
$("#optgroup-clickhouse").append(instance);
}
}
$('#instance_name').selectpicker('render');
Expand Down
Loading