Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

增加在线查询超时时间配置,超时连接主动关闭 #125

Merged
merged 4 commits into from
Apr 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions common/templates/config.html
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,16 @@ <h5 style="color: darkgrey"><b>SQL查询</b></h5>
</div>
</div>
</div>
<div class="form-group">
<label for="inception_remote_backup_port"
class="col-sm-4 control-label">MAX_EXECUTION_TIME</label>
<div class="col-sm-5">
<input type="number" class="form-control" id="max_execution_time"
key="max_execution_time"
value="{{ config.max_execution_time }}"
placeholder="在线查询超时时间阈值,单位秒,默认60">
</div>
</div>
<div class="form-group">
<label for="admin_query_limit"
class="col-sm-4 control-label">ADMIN_QUERY_LIMIT</label>
Expand Down
81 changes: 49 additions & 32 deletions sql/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@
from django.db import connection, OperationalError
from django.db.models import Q
from django.http import HttpResponse
from django_q.tasks import async_task, fetch

from common.config import SysConfig
from common.utils.extend_json_encoder import ExtendJSONEncoder
from sql.query_privileges import query_priv_check
from .models import QueryLog, Instance
from sql.engines import get_engine
from sql.engines import get_engine, ResultSet

logger = logging.getLogger('default')

Expand Down Expand Up @@ -48,6 +49,7 @@ def query(request):
return HttpResponse(json.dumps(result), content_type='application/json')

try:
config = SysConfig()
# 查询前的检查,禁用语句检查,语句切分
query_engine = get_engine(instance=instance)
query_check_info = query_engine.query_check(db_name=db_name, sql=sql_content)
Expand All @@ -56,7 +58,7 @@ def query(request):
result['status'] = 1
result['msg'] = query_check_info.get('msg')
return HttpResponse(json.dumps(result), content_type='application/json')
if query_check_info.get('has_star') and SysConfig().get('disable_star') is True:
if query_check_info.get('has_star') and config.get('disable_star') is True:
# 引擎内部判断为有 * 且禁止 * 选项打开
result['status'] = 1
result['msg'] = query_check_info.get('msg')
Expand All @@ -79,50 +81,65 @@ def query(request):
# 对查询sql增加limit限制或者改写语句
sql_content = query_engine.filter_sql(sql=sql_content, limit_num=limit_num)

# 执行查询语句,统计执行时间
t_start = time.time()
query_result = query_engine.query(db_name=str(db_name), sql=sql_content, limit_num=limit_num)
t_end = time.time()
query_result.query_time = "%5s" % "{:.4f}".format(t_end - t_start)
# 执行查询语句,timeout=max_execution_time
max_execution_time = int(config.get('max_execution_time', 60))
query_task_id = async_task(query_engine.query, db_name=str(db_name), sql=sql_content, limit_num=limit_num,
timeout=max_execution_time, cached=60)
# 等待执行结果,max_execution_time后还没有返回结果代表将会被终止
query_task = fetch(query_task_id, wait=max_execution_time * 1000, cached=True)
# 在max_execution_time内执行结束
if query_task:
if query_task.success:
query_result = query_task.result
query_result.query_time = query_task.time_taken()
else:
query_result = ResultSet(full_sql=sql_content)
query_result.error = query_task.result
# 等待超时,async_task主动关闭连接
else:
query_result = ResultSet(full_sql=sql_content)
query_result.error = f'查询时间超过 {max_execution_time} 秒,已被主动终止,请优化语句或者联系管理员。'

# 数据脱敏,仅对查询无错误的结果集进行脱敏
if SysConfig().get('data_masking') and query_result.error is None:
try:
# 记录脱敏时间
t_start = time.time()
masking_result = query_engine.query_masking(db_name=db_name, sql=sql_content, resultset=query_result)
t_end = time.time()
masking_result.mask_time = "%5s" % "{:.4f}".format(t_end - t_start)
# 脱敏出错,并且开启query_check,直接返回异常,禁止执行
if masking_result.error and SysConfig().get('query_check'):
result['status'] = 1
result['msg'] = masking_result.error
# 脱敏出错,关闭query_check,忽略错误信息,返回未脱敏数据,权限校验标记为跳过
elif masking_result.error and not SysConfig().get('query_check'):
query_result.error = None
priv_check = False
result['data'] = query_result.__dict__
# 查询异常
if query_result.error:
result['status'] = 1
result['msg'] = query_result.error
# 数据脱敏,仅对查询无错误的结果集进行脱敏,并且按照query_check配置是否返回
elif config.get('data_masking'):
query_masking_task_id = async_task(query_engine.query_masking, db_name=db_name, sql=sql_content,
resultset=query_result, cached=60)
query_masking_task = fetch(query_masking_task_id, wait=60 * 1000, cached=True)
if query_masking_task.success:
masking_result = query_masking_task.result
masking_result.mask_time = query_masking_task.time_taken()
# 脱敏出错
if masking_result.error:
# 开启query_check,直接返回异常,禁止执行
if config.get('query_check'):
result['status'] = 1
result['msg'] = masking_result.error
# 关闭query_check,忽略错误信息,返回未脱敏数据,权限校验标记为跳过
else:
query_result.error = None
priv_check = False
result['data'] = query_result.__dict__
# 正常脱敏
else:
result['data'] = masking_result.__dict__
except Exception as e:
else:
logger.error(f'数据脱敏异常,查询语句:{sql_content}\n,错误信息:{traceback.format_exc()}')
# 抛出未定义异常,并且开启query_check,直接返回异常,禁止执行
if SysConfig().get('query_check'):
if config.get('query_check'):
result['status'] = 1
result['msg'] = f'数据脱敏异常,请联系管理员,错误信息:{e}'
result['msg'] = f'数据脱敏异常,请联系管理员,错误信息:{query_masking_task.result}'
# 关闭query_check,忽略错误信息,返回未脱敏数据,权限校验标记为跳过
else:
query_result.error = None
priv_check = False
result['data'] = query_result.__dict__
# 无需脱敏的语句
else:
if query_result.error:
result['status'] = 1
result['msg'] = query_result.error
else:
result['data'] = query_result.__dict__
result['data'] = query_result.__dict__

# 仅将成功的查询语句记录存入数据库
if not query_result.error:
Expand Down
13 changes: 6 additions & 7 deletions sql/sql_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@
from sql.notify import notify_for_audit
from sql.models import ResourceGroup, Users
from sql.utils.resource_group import user_groups, user_instances
from sql.utils.jobs import add_sqlcronjob, del_sqlcronjob
from sql.utils.tasks import add_sql_schedule, del_schedule
from sql.utils.sql_review import can_timingtask, can_cancel, can_execute
from sql.utils.sql_utils import get_syntax_type
from sql.utils.workflow_audit import Audit
from .models import SqlWorkflow, SqlWorkflowContent, Instance
from django_q.tasks import async_task
Expand Down Expand Up @@ -308,7 +307,7 @@ def timing_task(request):
return render(request, 'error.html', context)

run_date = datetime.datetime.strptime(run_date, "%Y-%m-%d %H:%M")
job_id = Const.workflowJobprefix['sqlreview'] + '-' + str(workflow_id)
task_name = f"{Const.workflowJobprefix['sqlreview']}-{workflow_id}"

# 使用事务保持数据一致性
try:
Expand All @@ -317,7 +316,7 @@ def timing_task(request):
workflow_detail.status = 'workflow_timingtask'
workflow_detail.save()
# 调用添加定时任务
add_sqlcronjob(job_id, run_date, workflow_id)
add_sql_schedule(task_name, run_date, workflow_id)
# 增加工单日志
audit_id = Audit.detail_by_workflow_id(workflow_id=workflow_id,
workflow_type=WorkflowDict.workflow_type[
Expand Down Expand Up @@ -396,10 +395,10 @@ def cancel(request):
else:
raise PermissionDenied

# 删除定时执行job
# 删除定时执行task
if workflow_detail.status == 'workflow_timingtask':
job_id = Const.workflowJobprefix['sqlreview'] + '-' + str(workflow_id)
del_sqlcronjob(job_id)
task_name = f"{Const.workflowJobprefix['sqlreview']}-{workflow_id}"
del_schedule(task_name)
# 将流程状态修改为人工终止流程
workflow_detail.status = 'workflow_abort'
workflow_detail.save()
Expand Down
32 changes: 19 additions & 13 deletions sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ def test_user_query_priv_no_query_mgtpriv(self):
self.assertEqual(json.loads(r.content), {"total": 0, "rows": []})


class TestQuery(TestCase):
class TestQuery(TransactionTestCase):
def setUp(self):
self.slave1 = Instance(instance_name='test_slave_instance', type='slave', db_type='mysql',
host='testhost', port=3306, user='mysql_user', password='mysql_password')
Expand All @@ -625,10 +625,11 @@ def tearDown(self):
archer_config = SysConfig()
archer_config.set('disable_star', False)

@patch('sql.query.fetch')
@patch('sql.query.async_task')
@patch('sql.engines.mysql.MysqlEngine.query')
@patch('sql.engines.mysql.MysqlEngine.query_masking')
@patch('sql.query.query_priv_check')
def testCorrectSQL(self, _priv_check, _query_masking, _query):
def testCorrectSQL(self, _priv_check, _query, _async_task, _fetch):
c = Client()
some_sql = 'select some from some_table limit 100;'
some_db = 'some_db'
Expand All @@ -643,22 +644,24 @@ def testCorrectSQL(self, _priv_check, _query_masking, _query):
q_result = ResultSet(full_sql=some_sql, rows=['value'])
q_result.column_list = ['some']

_query.return_value = q_result
_query_masking.return_value = q_result
_async_task.return_value = q_result
_fetch.return_value.result = q_result
_priv_check.return_value = {'status': 0, 'data': {'limit_num': 100, 'priv_check': True}}
r = c.post('/query/', data={'instance_name': self.slave1.instance_name,
'sql_content': some_sql,
'db_name': some_db,
'limit_num': some_limit})
_query.assert_called_once_with(db_name=some_db, sql=some_sql, limit_num=some_limit)
_async_task.assert_called_once_with(_query, db_name=some_db, sql=some_sql, limit_num=some_limit, timeout=60,
cached=60)
r_json = r.json()
self.assertEqual(r_json['data']['rows'], ['value'])
self.assertEqual(r_json['data']['column_list'], ['some'])

@patch('sql.query.fetch')
@patch('sql.query.async_task')
@patch('sql.engines.mysql.MysqlEngine.query')
@patch('sql.engines.mysql.MysqlEngine.query_masking')
@patch('sql.query.query_priv_check')
def testSQLWithoutLimit(self, _priv_check, _query_masking, _query):
def testSQLWithoutLimit(self, _priv_check, _query, _async_task, _fetch):
c = Client()
some_limit = 100
sql_without_limit = 'select some from some_table'
Expand All @@ -667,27 +670,30 @@ def testSQLWithoutLimit(self, _priv_check, _query_masking, _query):
c.force_login(self.u2)
q_result = ResultSet(full_sql=sql_without_limit, rows=['value'])
q_result.column_list = ['some']
_query.return_value = q_result
_query_masking.return_value = q_result
_async_task.return_value = q_result
_fetch.return_value.result = q_result
_fetch.return_value.time_taken.return_value = 1
_priv_check.return_value = {'status': 0, 'data': {'limit_num': 100, 'priv_check': True}}
r = c.post('/query/', data={'instance_name': self.slave1.instance_name,
'sql_content': sql_without_limit,
'db_name': some_db,
'limit_num': some_limit})
_query.assert_called_once_with(db_name=some_db, sql=sql_with_limit, limit_num=some_limit)
_async_task.assert_called_once_with(_query, db_name=some_db, sql=sql_with_limit, limit_num=some_limit,
timeout=60, cached=60)
r_json = r.json()
self.assertEqual(r_json['data']['rows'], ['value'])
self.assertEqual(r_json['data']['column_list'], ['some'])

# 带 * 且不带 limit 的sql
sql_with_star = 'select * from some_table'
filtered_sql_with_star = 'select * from some_table limit {0};'.format(some_limit)
_query.reset_mock()
_async_task.reset_mock()
c.post('/query/', data={'instance_name': self.slave1.instance_name,
'sql_content': sql_with_star,
'db_name': some_db,
'limit_num': some_limit})
_query.assert_called_once_with(db_name=some_db, sql=filtered_sql_with_star, limit_num=some_limit)
_async_task.assert_called_once_with(_query, db_name=some_db, sql=filtered_sql_with_star, limit_num=some_limit,
timeout=60, cached=60)

@patch('sql.query.query_priv_check')
def testStarOptionOn(self, _priv_check):
Expand Down
4 changes: 2 additions & 2 deletions sql/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import sql.sql_optimize
from common import auth, config, workflow, dashboard, check
from sql import views, sql_workflow, sql_analyze, query, slowlog, instance, db_diagnostic, resource_group, binlog
from sql.utils import jobs
from sql.utils import tasks

urlpatterns = [
path('', views.sqlworkflow),
Expand Down Expand Up @@ -53,7 +53,7 @@
path('sqlworkflow_list/', sql_workflow.sql_workflow_list),
path('simplecheck/', sql_workflow.check),
path('getWorkflowStatus/', sql_workflow.get_workflow_status),
path('del_sqlcronjob/', jobs.del_sqlcronjob),
path('del_sqlcronjob/', tasks.del_schedule),

path('sql_analyze/generate/', sql_analyze.generate),
path('sql_analyze/analyze/', sql_analyze.analyze),
Expand Down
33 changes: 0 additions & 33 deletions sql/utils/jobs.py

This file was deleted.

35 changes: 35 additions & 0 deletions sql/utils/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# -*- coding:utf-8 -*-
from django_q.tasks import schedule
from django_q.models import Schedule

import logging

logger = logging.getLogger('default')


def add_sql_schedule(name, run_date, workflow_id):
"""添加/修改sql定时任务"""
del_schedule(name)
schedule('sql.utils.execute_sql.execute', workflow_id,
hook='sql.utils.execute_sql.execute_callback',
name=name, schedule_type='O', next_run=run_date, repeats=1)
logger.debug(f"添加SQL定时执行任务:{name} 执行时间:{run_date}")


def del_schedule(name):
"""删除task"""
try:
sql_schedule = Schedule.objects.get(name=name)
Schedule.delete(sql_schedule)
logger.debug(f'删除task:{name}')
except Schedule.DoesNotExist:
logger.debug(f'删除task:{name}失败,任务不存在')


def task_info(name):
"""获取定时任务详情"""
try:
sql_schedule = Schedule.objects.get(name=name)
return sql_schedule
except Schedule.DoesNotExist:
pass
4 changes: 2 additions & 2 deletions sql/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from sql.engines import get_engine
from common.utils.permission import superuser_required
from sql.engines.models import ReviewResult, ReviewSet
from sql.utils.jobs import job_info
from sql.utils.tasks import task_info

from .models import Users, SqlWorkflow, QueryPrivileges, ResourceGroup, \
QueryPrivilegesApply, Config, SQL_WORKFLOW_CHOICES
Expand Down Expand Up @@ -94,7 +94,7 @@ def detail(request, workflow_id):
# 获取定时执行任务信息
if workflow_detail.status == 'workflow_timingtask':
job_id = Const.workflowJobprefix['sqlreview'] + '-' + str(workflow_id)
job = job_info(job_id)
job = task_info(job_id)
if job:
run_date = job.next_run
else:
Expand Down