Skip to content

Commit

Permalink
Merge pull request #125 from hhyo/max_execution_time
Browse files Browse the repository at this point in the history
增加在线查询超时时间配置,超时连接主动关闭
  • Loading branch information
hhyo authored Apr 14, 2019
2 parents 2edf460 + f36e9f1 commit ddfa689
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 89 deletions.
10 changes: 10 additions & 0 deletions common/templates/config.html
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,16 @@ <h5 style="color: darkgrey"><b>SQL查询</b></h5>
</div>
</div>
</div>
<div class="form-group">
<label for="inception_remote_backup_port"
class="col-sm-4 control-label">MAX_EXECUTION_TIME</label>
<div class="col-sm-5">
<input type="number" class="form-control" id="max_execution_time"
key="max_execution_time"
value="{{ config.max_execution_time }}"
placeholder="在线查询超时时间阈值,单位秒,默认60">
</div>
</div>
<div class="form-group">
<label for="admin_query_limit"
class="col-sm-4 control-label">ADMIN_QUERY_LIMIT</label>
Expand Down
81 changes: 49 additions & 32 deletions sql/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@
from django.db import connection, OperationalError
from django.db.models import Q
from django.http import HttpResponse
from django_q.tasks import async_task, fetch

from common.config import SysConfig
from common.utils.extend_json_encoder import ExtendJSONEncoder
from sql.query_privileges import query_priv_check
from .models import QueryLog, Instance
from sql.engines import get_engine
from sql.engines import get_engine, ResultSet

logger = logging.getLogger('default')

Expand Down Expand Up @@ -48,6 +49,7 @@ def query(request):
return HttpResponse(json.dumps(result), content_type='application/json')

try:
config = SysConfig()
# 查询前的检查,禁用语句检查,语句切分
query_engine = get_engine(instance=instance)
query_check_info = query_engine.query_check(db_name=db_name, sql=sql_content)
Expand All @@ -56,7 +58,7 @@ def query(request):
result['status'] = 1
result['msg'] = query_check_info.get('msg')
return HttpResponse(json.dumps(result), content_type='application/json')
if query_check_info.get('has_star') and SysConfig().get('disable_star') is True:
if query_check_info.get('has_star') and config.get('disable_star') is True:
# 引擎内部判断为有 * 且禁止 * 选项打开
result['status'] = 1
result['msg'] = query_check_info.get('msg')
Expand All @@ -79,50 +81,65 @@ def query(request):
# 对查询sql增加limit限制或者改写语句
sql_content = query_engine.filter_sql(sql=sql_content, limit_num=limit_num)

# 执行查询语句,统计执行时间
t_start = time.time()
query_result = query_engine.query(db_name=str(db_name), sql=sql_content, limit_num=limit_num)
t_end = time.time()
query_result.query_time = "%5s" % "{:.4f}".format(t_end - t_start)
# 执行查询语句,timeout=max_execution_time
max_execution_time = int(config.get('max_execution_time', 60))
query_task_id = async_task(query_engine.query, db_name=str(db_name), sql=sql_content, limit_num=limit_num,
timeout=max_execution_time, cached=60)
# 等待执行结果,max_execution_time后还没有返回结果代表将会被终止
query_task = fetch(query_task_id, wait=max_execution_time * 1000, cached=True)
# 在max_execution_time内执行结束
if query_task:
if query_task.success:
query_result = query_task.result
query_result.query_time = query_task.time_taken()
else:
query_result = ResultSet(full_sql=sql_content)
query_result.error = query_task.result
# 等待超时,async_task主动关闭连接
else:
query_result = ResultSet(full_sql=sql_content)
query_result.error = f'查询时间超过 {max_execution_time} 秒,已被主动终止,请优化语句或者联系管理员。'

# 数据脱敏,仅对查询无错误的结果集进行脱敏
if SysConfig().get('data_masking') and query_result.error is None:
try:
# 记录脱敏时间
t_start = time.time()
masking_result = query_engine.query_masking(db_name=db_name, sql=sql_content, resultset=query_result)
t_end = time.time()
masking_result.mask_time = "%5s" % "{:.4f}".format(t_end - t_start)
# 脱敏出错,并且开启query_check,直接返回异常,禁止执行
if masking_result.error and SysConfig().get('query_check'):
result['status'] = 1
result['msg'] = masking_result.error
# 脱敏出错,关闭query_check,忽略错误信息,返回未脱敏数据,权限校验标记为跳过
elif masking_result.error and not SysConfig().get('query_check'):
query_result.error = None
priv_check = False
result['data'] = query_result.__dict__
# 查询异常
if query_result.error:
result['status'] = 1
result['msg'] = query_result.error
# 数据脱敏,仅对查询无错误的结果集进行脱敏,并且按照query_check配置是否返回
elif config.get('data_masking'):
query_masking_task_id = async_task(query_engine.query_masking, db_name=db_name, sql=sql_content,
resultset=query_result, cached=60)
query_masking_task = fetch(query_masking_task_id, wait=60 * 1000, cached=True)
if query_masking_task.success:
masking_result = query_masking_task.result
masking_result.mask_time = query_masking_task.time_taken()
# 脱敏出错
if masking_result.error:
# 开启query_check,直接返回异常,禁止执行
if config.get('query_check'):
result['status'] = 1
result['msg'] = masking_result.error
# 关闭query_check,忽略错误信息,返回未脱敏数据,权限校验标记为跳过
else:
query_result.error = None
priv_check = False
result['data'] = query_result.__dict__
# 正常脱敏
else:
result['data'] = masking_result.__dict__
except Exception as e:
else:
logger.error(f'数据脱敏异常,查询语句:{sql_content}\n,错误信息:{traceback.format_exc()}')
# 抛出未定义异常,并且开启query_check,直接返回异常,禁止执行
if SysConfig().get('query_check'):
if config.get('query_check'):
result['status'] = 1
result['msg'] = f'数据脱敏异常,请联系管理员,错误信息:{e}'
result['msg'] = f'数据脱敏异常,请联系管理员,错误信息:{query_masking_task.result}'
# 关闭query_check,忽略错误信息,返回未脱敏数据,权限校验标记为跳过
else:
query_result.error = None
priv_check = False
result['data'] = query_result.__dict__
# 无需脱敏的语句
else:
if query_result.error:
result['status'] = 1
result['msg'] = query_result.error
else:
result['data'] = query_result.__dict__
result['data'] = query_result.__dict__

# 仅将成功的查询语句记录存入数据库
if not query_result.error:
Expand Down
13 changes: 6 additions & 7 deletions sql/sql_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@
from sql.notify import notify_for_audit
from sql.models import ResourceGroup, Users
from sql.utils.resource_group import user_groups, user_instances
from sql.utils.jobs import add_sqlcronjob, del_sqlcronjob
from sql.utils.tasks import add_sql_schedule, del_schedule
from sql.utils.sql_review import can_timingtask, can_cancel, can_execute
from sql.utils.sql_utils import get_syntax_type
from sql.utils.workflow_audit import Audit
from .models import SqlWorkflow, SqlWorkflowContent, Instance
from django_q.tasks import async_task
Expand Down Expand Up @@ -308,7 +307,7 @@ def timing_task(request):
return render(request, 'error.html', context)

run_date = datetime.datetime.strptime(run_date, "%Y-%m-%d %H:%M")
job_id = Const.workflowJobprefix['sqlreview'] + '-' + str(workflow_id)
task_name = f"{Const.workflowJobprefix['sqlreview']}-{workflow_id}"

# 使用事务保持数据一致性
try:
Expand All @@ -317,7 +316,7 @@ def timing_task(request):
workflow_detail.status = 'workflow_timingtask'
workflow_detail.save()
# 调用添加定时任务
add_sqlcronjob(job_id, run_date, workflow_id)
add_sql_schedule(task_name, run_date, workflow_id)
# 增加工单日志
audit_id = Audit.detail_by_workflow_id(workflow_id=workflow_id,
workflow_type=WorkflowDict.workflow_type[
Expand Down Expand Up @@ -396,10 +395,10 @@ def cancel(request):
else:
raise PermissionDenied

# 删除定时执行job
# 删除定时执行task
if workflow_detail.status == 'workflow_timingtask':
job_id = Const.workflowJobprefix['sqlreview'] + '-' + str(workflow_id)
del_sqlcronjob(job_id)
task_name = f"{Const.workflowJobprefix['sqlreview']}-{workflow_id}"
del_schedule(task_name)
# 将流程状态修改为人工终止流程
workflow_detail.status = 'workflow_abort'
workflow_detail.save()
Expand Down
32 changes: 19 additions & 13 deletions sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ def test_user_query_priv_no_query_mgtpriv(self):
self.assertEqual(json.loads(r.content), {"total": 0, "rows": []})


class TestQuery(TestCase):
class TestQuery(TransactionTestCase):
def setUp(self):
self.slave1 = Instance(instance_name='test_slave_instance', type='slave', db_type='mysql',
host='testhost', port=3306, user='mysql_user', password='mysql_password')
Expand All @@ -625,10 +625,11 @@ def tearDown(self):
archer_config = SysConfig()
archer_config.set('disable_star', False)

@patch('sql.query.fetch')
@patch('sql.query.async_task')
@patch('sql.engines.mysql.MysqlEngine.query')
@patch('sql.engines.mysql.MysqlEngine.query_masking')
@patch('sql.query.query_priv_check')
def testCorrectSQL(self, _priv_check, _query_masking, _query):
def testCorrectSQL(self, _priv_check, _query, _async_task, _fetch):
c = Client()
some_sql = 'select some from some_table limit 100;'
some_db = 'some_db'
Expand All @@ -643,22 +644,24 @@ def testCorrectSQL(self, _priv_check, _query_masking, _query):
q_result = ResultSet(full_sql=some_sql, rows=['value'])
q_result.column_list = ['some']

_query.return_value = q_result
_query_masking.return_value = q_result
_async_task.return_value = q_result
_fetch.return_value.result = q_result
_priv_check.return_value = {'status': 0, 'data': {'limit_num': 100, 'priv_check': True}}
r = c.post('/query/', data={'instance_name': self.slave1.instance_name,
'sql_content': some_sql,
'db_name': some_db,
'limit_num': some_limit})
_query.assert_called_once_with(db_name=some_db, sql=some_sql, limit_num=some_limit)
_async_task.assert_called_once_with(_query, db_name=some_db, sql=some_sql, limit_num=some_limit, timeout=60,
cached=60)
r_json = r.json()
self.assertEqual(r_json['data']['rows'], ['value'])
self.assertEqual(r_json['data']['column_list'], ['some'])

@patch('sql.query.fetch')
@patch('sql.query.async_task')
@patch('sql.engines.mysql.MysqlEngine.query')
@patch('sql.engines.mysql.MysqlEngine.query_masking')
@patch('sql.query.query_priv_check')
def testSQLWithoutLimit(self, _priv_check, _query_masking, _query):
def testSQLWithoutLimit(self, _priv_check, _query, _async_task, _fetch):
c = Client()
some_limit = 100
sql_without_limit = 'select some from some_table'
Expand All @@ -667,27 +670,30 @@ def testSQLWithoutLimit(self, _priv_check, _query_masking, _query):
c.force_login(self.u2)
q_result = ResultSet(full_sql=sql_without_limit, rows=['value'])
q_result.column_list = ['some']
_query.return_value = q_result
_query_masking.return_value = q_result
_async_task.return_value = q_result
_fetch.return_value.result = q_result
_fetch.return_value.time_taken.return_value = 1
_priv_check.return_value = {'status': 0, 'data': {'limit_num': 100, 'priv_check': True}}
r = c.post('/query/', data={'instance_name': self.slave1.instance_name,
'sql_content': sql_without_limit,
'db_name': some_db,
'limit_num': some_limit})
_query.assert_called_once_with(db_name=some_db, sql=sql_with_limit, limit_num=some_limit)
_async_task.assert_called_once_with(_query, db_name=some_db, sql=sql_with_limit, limit_num=some_limit,
timeout=60, cached=60)
r_json = r.json()
self.assertEqual(r_json['data']['rows'], ['value'])
self.assertEqual(r_json['data']['column_list'], ['some'])

# 带 * 且不带 limit 的sql
sql_with_star = 'select * from some_table'
filtered_sql_with_star = 'select * from some_table limit {0};'.format(some_limit)
_query.reset_mock()
_async_task.reset_mock()
c.post('/query/', data={'instance_name': self.slave1.instance_name,
'sql_content': sql_with_star,
'db_name': some_db,
'limit_num': some_limit})
_query.assert_called_once_with(db_name=some_db, sql=filtered_sql_with_star, limit_num=some_limit)
_async_task.assert_called_once_with(_query, db_name=some_db, sql=filtered_sql_with_star, limit_num=some_limit,
timeout=60, cached=60)

@patch('sql.query.query_priv_check')
def testStarOptionOn(self, _priv_check):
Expand Down
4 changes: 2 additions & 2 deletions sql/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import sql.sql_optimize
from common import auth, config, workflow, dashboard, check
from sql import views, sql_workflow, sql_analyze, query, slowlog, instance, db_diagnostic, resource_group, binlog
from sql.utils import jobs
from sql.utils import tasks

urlpatterns = [
path('', views.sqlworkflow),
Expand Down Expand Up @@ -53,7 +53,7 @@
path('sqlworkflow_list/', sql_workflow.sql_workflow_list),
path('simplecheck/', sql_workflow.check),
path('getWorkflowStatus/', sql_workflow.get_workflow_status),
path('del_sqlcronjob/', jobs.del_sqlcronjob),
path('del_sqlcronjob/', tasks.del_schedule),

path('sql_analyze/generate/', sql_analyze.generate),
path('sql_analyze/analyze/', sql_analyze.analyze),
Expand Down
33 changes: 0 additions & 33 deletions sql/utils/jobs.py

This file was deleted.

35 changes: 35 additions & 0 deletions sql/utils/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# -*- coding:utf-8 -*-
from django_q.tasks import schedule
from django_q.models import Schedule

import logging

logger = logging.getLogger('default')


def add_sql_schedule(name, run_date, workflow_id):
"""添加/修改sql定时任务"""
del_schedule(name)
schedule('sql.utils.execute_sql.execute', workflow_id,
hook='sql.utils.execute_sql.execute_callback',
name=name, schedule_type='O', next_run=run_date, repeats=1)
logger.debug(f"添加SQL定时执行任务:{name} 执行时间:{run_date}")


def del_schedule(name):
"""删除task"""
try:
sql_schedule = Schedule.objects.get(name=name)
Schedule.delete(sql_schedule)
logger.debug(f'删除task:{name}')
except Schedule.DoesNotExist:
logger.debug(f'删除task:{name}失败,任务不存在')


def task_info(name):
"""获取定时任务详情"""
try:
sql_schedule = Schedule.objects.get(name=name)
return sql_schedule
except Schedule.DoesNotExist:
pass
4 changes: 2 additions & 2 deletions sql/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from sql.engines import get_engine
from common.utils.permission import superuser_required
from sql.engines.models import ReviewResult, ReviewSet
from sql.utils.jobs import job_info
from sql.utils.tasks import task_info

from .models import Users, SqlWorkflow, QueryPrivileges, ResourceGroup, \
QueryPrivilegesApply, Config, SQL_WORKFLOW_CHOICES
Expand Down Expand Up @@ -94,7 +94,7 @@ def detail(request, workflow_id):
# 获取定时执行任务信息
if workflow_detail.status == 'workflow_timingtask':
job_id = Const.workflowJobprefix['sqlreview'] + '-' + str(workflow_id)
job = job_info(job_id)
job = task_info(job_id)
if job:
run_date = job.next_run
else:
Expand Down

1 comment on commit ddfa689

@LeoQuote
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close #100

Please sign in to comment.