Skip to content

Commit

Permalink
新增my2sql工具插件模块 #1224 (#1314)
Browse files Browse the repository at this point in the history
* 新增my2sql工具插件模块
  • Loading branch information
nick2wang authored Jan 5, 2022
1 parent af13722 commit f5292f4
Show file tree
Hide file tree
Showing 15 changed files with 817 additions and 8 deletions.
5 changes: 5 additions & 0 deletions common/templates/base.html
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,11 @@
<a href="/binlog2sql/">Binlog2SQL</a>
</li>
{% endif %}
{% if perms.sql.menu_my2sql %}
<li>
<a href="/my2sql/">My2SQL</a>
</li>
{% endif %}
{% if perms.sql.menu_schemasync %}
<li>
<a href="/schemasync/">SchemaSync</a>
Expand Down
11 changes: 11 additions & 0 deletions common/templates/config.html
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,17 @@ <h4 style="color: darkgrey"><b>其他配置</b></h4>
placeholder="binlog2sql调用路径,类似/opt/binlog2sql.py">
</div>
</div>
<div class="form-group">
<label for="my2sql"
class="col-sm-4 control-label">MY2SQL</label>
<div class="col-sm-5">
<input type="text" class="form-control"
id="my2sql"
key="my2sql"
value="{{ config.my2sql }}"
placeholder="my2sql调用路径,类似/opt/archery/src/plugins/my2sql">
</div>
</div>
<div class="form-group">
<label for="default_auth_group"
class="col-sm-4 control-label">DEFAULT_AUTH_GROUP</label>
Expand Down
143 changes: 140 additions & 3 deletions sql/binlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import time
import traceback
import shlex

import simplejson as json
from django.conf import settings
Expand All @@ -15,7 +16,8 @@
from sql.engines import get_engine

from sql.plugins.binglog2sql import Binlog2Sql
from sql.notify import notify_for_binlog2sql
from sql.plugins.my2sql import My2SQL
from sql.notify import notify_for_binlog2sql, notify_for_my2sql
from .models import Instance

logger = logging.getLogger('default')
Expand Down Expand Up @@ -112,7 +114,8 @@ def binlog2sql(request):
# 提交给binlog2sql进行解析
binlog2sql = Binlog2Sql()
# 准备参数
args = {"conn_options": fr"-h{instance.host} -u{instance.user} -p'{instance.password}' -P{instance.port} ",
args = {"conn_options": fr"-h{shlex.quote(str(instance.host))} -u{shlex.quote(str(instance.user))} \
-p'{shlex.quote(str(instance.password))}' -P{shlex.quote(str(instance.port))} ",
"stop_never": False,
"no-primary-key": no_pk,
"flashback": flashback,
Expand Down Expand Up @@ -190,7 +193,8 @@ def binlog2sql_file(args, user):
"""
binlog2sql = Binlog2Sql()
instance = args.get('instance')
conn_options = fr"-h{instance.host} -u{instance.user} -p'{instance.password}' -P{instance.port}"
conn_options = fr"-h{shlex.quote(str(instance.host))} -u{shlex.quote(str(instance.user))} \
-p'{shlex.quote(str(instance.password))}' -P{shlex.quote(str(instance.port))}"
args['conn_options'] = conn_options
timestamp = int(time.time())
path = os.path.join(settings.BASE_DIR, 'downloads/binlog2sql/')
Expand All @@ -208,3 +212,136 @@ def binlog2sql_file(args, user):
for c in iter(p.stdout.readline, ''):
f.write(c)
return user, filename


@permission_required('sql.menu_my2sql', raise_exception=True)
def my2sql(request):
"""
通过解析binlog获取SQL--使用my2sql
:param request:
:return:
"""
instance_name = request.POST.get('instance_name')
save_sql = True if request.POST.get('save_sql') == 'true' else False
instance = Instance.objects.get(instance_name=instance_name)
work_type = 'rollback' if request.POST.get('rollback') == 'true' else '2sql'
num = 30 if request.POST.get('num') == '' else int(request.POST.get('num'))
threads = 4 if request.POST.get('threads') == '' else int(request.POST.get('threads'))
start_file = request.POST.get('start_file')
start_pos = request.POST.get('start_pos') if request.POST.get('start_pos') == '' else int(
request.POST.get('start_pos'))
end_file = request.POST.get('end_file')
end_pos = request.POST.get('end_pos') if request.POST.get('end_pos') == '' else int(request.POST.get('end_pos'))
stop_time = request.POST.get('stop_time')
start_time = request.POST.get('start_time')
only_schemas = request.POST.getlist('only_schemas')
only_tables = request.POST.getlist('only_tables[]')
sql_type = [] if request.POST.getlist('sql_type[]') == [] else request.POST.getlist('sql_type[]')
extra_info = True if request.POST.get('extra_info') == 'true' else False
ignore_primary_key = True if request.POST.get('ignore_primary_key') == 'true' else False
full_columns = True if request.POST.get('full_columns') == 'true' else False
no_db_prefix = True if request.POST.get('no_db_prefix') == 'true' else False
file_per_table = True if request.POST.get('file_per_table') == 'true' else False

result = {'status': 0, 'msg': 'ok', 'data': []}

# 提交给my2sql进行解析
my2sql = My2SQL()

# 准备参数
args = {"conn_options": fr"-host {shlex.quote(str(instance.host))} -user {shlex.quote(str(instance.user))} \
-password '{shlex.quote(str(instance.password))}' -port {shlex.quote(str(instance.port))} ",
"work-type": work_type,
"start-file": start_file,
"start-pos": start_pos,
"stop-file": end_file,
"stop-pos": end_pos,
"start-datetime": start_time,
"stop-datetime": stop_time,
"databases": ' '.join(only_schemas),
"tables": ','.join(only_tables),
"sql": ','.join(sql_type),
"instance": instance,
"threads": threads,
"add-extraInfo": extra_info,
"ignore-primaryKey-forInsert": ignore_primary_key,
"full-columns": full_columns,
"do-not-add-prifixDb": no_db_prefix,
"file-per-table": file_per_table,
"output-toScreen": True
}

# 参数检查
args_check_result = my2sql.check_args(args)
if args_check_result['status'] == 1:
return HttpResponse(json.dumps(args_check_result), content_type='application/json')
# 参数转换
cmd_args = my2sql.generate_args2cmd(args, shell=True)

# 执行命令
try:
p = my2sql.execute_cmd(cmd_args, shell=True)
# 读取前num行后结束
rows = []
n = 1
for line in iter(p.stdout.readline, ''):
if n <= num and isinstance(line, str):
if line[0:6].upper() in ('INSERT', 'DELETE', 'UPDATE'):
n = n + 1
row_info = {}
try:
row_info['sql'] = line + ';'
except IndexError:
row_info['sql'] = line + ';'
rows.append(row_info)
else:
break

if rows.__len__() == 0:
# 判断是否有异常
stderr = p.stderr.read()
if stderr and isinstance(stderr, str):
result['status'] = 1
result['msg'] = stderr
return HttpResponse(json.dumps(result), content_type='application/json')
# 终止子进程
p.kill()
result['data'] = rows
except Exception as e:
logger.error(traceback.format_exc())
result['status'] = 1
result['msg'] = str(e)

# 异步保存到文件
if save_sql:
args.pop('conn_options')
args.pop('output-toScreen')
async_task(my2sql_file, args=args, user=request.user, hook=notify_for_my2sql, timeout=-1,
task_name=f'my2sql-{time.time()}')

# 返回查询结果
return HttpResponse(json.dumps(result, cls=ExtendJSONEncoder, bigint_as_string=True),
content_type='application/json')


def my2sql_file(args, user):
"""
用于异步保存binlog解析的文件
:param args: 参数
:param user: 操作用户对象,用户消息推送
:return:
"""
my2sql = My2SQL()
instance = args.get('instance')
conn_options = fr"-host {shlex.quote(str(instance.host))} -user {shlex.quote(str(instance.user))} \
-password '{shlex.quote(str(instance.password))}' -port {shlex.quote(str(instance.port))} "
args['conn_options'] = conn_options
path = os.path.join(settings.BASE_DIR, 'downloads/my2sql/')
os.makedirs(path, exist_ok=True)

# 参数转换
args["output-dir"] = path
cmd_args = my2sql.generate_args2cmd(args, shell=True)
# 使用output-dir参数执行命令保存sql
my2sql.execute_cmd(cmd_args, shell=True)
return user, path
2 changes: 1 addition & 1 deletion sql/fixtures/auth_group.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ where codename in ('menu_dashboard','menu_sqlcheck','menu_sqlworkflow','menu_sql
insert into auth_group_permissions (group_id, permission_id)
select 3,id
from auth_permission
where codename in ('menu_dashboard','menu_sqlcheck','menu_sqlworkflow','menu_sqlanalyze','menu_query','menu_sqlquery','menu_queryapplylist','menu_sqloptimize','menu_sqladvisor','menu_slowquery','menu_instance','menu_instance_list','menu_dbdiagnostic','menu_database','menu_instance_account','menu_param','menu_data_dictionary','menu_tools','menu_archive','menu_binlog2sql','menu_schemasync','menu_system','menu_document','sql_submit','sql_review','sql_execute_for_resource_group','sql_execute','sql_analyze','optimize_sqladvisor','optimize_sqltuning','optimize_soar','query_applypriv','query_mgtpriv','query_review','query_submit','query_all_instances','query_resource_group_instance','process_view','process_kill','tablespace_view','trx_view','trxandlocks_view','instance_account_manage','param_view','param_edit','data_dictionary_export','archive_apply','archive_review','archive_mgt');
where codename in ('menu_dashboard','menu_sqlcheck','menu_sqlworkflow','menu_sqlanalyze','menu_query','menu_sqlquery','menu_queryapplylist','menu_sqloptimize','menu_sqladvisor','menu_slowquery','menu_instance','menu_instance_list','menu_dbdiagnostic','menu_database','menu_instance_account','menu_param','menu_data_dictionary','menu_tools','menu_archive','menu_binlog2sql','menu_my2sql','menu_schemasync','menu_system','menu_document','sql_submit','sql_review','sql_execute_for_resource_group','sql_execute','sql_analyze','optimize_sqladvisor','optimize_sqltuning','optimize_soar','query_applypriv','query_mgtpriv','query_review','query_submit','query_all_instances','query_resource_group_instance','process_view','process_kill','tablespace_view','trx_view','trxandlocks_view','instance_account_manage','param_view','param_edit','data_dictionary_export','archive_apply','archive_review','archive_mgt');

-- PM
insert into auth_group_permissions (group_id, permission_id)
Expand Down
1 change: 1 addition & 0 deletions sql/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,7 @@ class Meta:
('menu_tools', '菜单 工具插件'),
('menu_archive', '菜单 数据归档'),
('menu_binlog2sql', '菜单 Binlog2SQL'),
('menu_my2sql', '菜单 My2SQL'),
('menu_schemasync', '菜单 SchemaSync'),
('menu_system', '菜单 系统管理'),
('menu_document', '菜单 相关文档'),
Expand Down
20 changes: 20 additions & 0 deletions sql/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,23 @@ def notify_for_binlog2sql(task):
# 发送
msg_to = [task.kwargs['user']]
__send(msg_title, msg_content, msg_to)


def notify_for_my2sql(task):
"""
my2sql执行结束的通知
:param task:
:return:
"""
# 判断是否开启消息通知,未开启直接返回
if not __notify_cnf_status():
return None
if task.success:
msg_title = '[Archery 通知]My2SQL执行结束'
msg_content = f'解析的SQL文件在{task.result[1]}目录下,请前往查看'
else:
msg_title = '[Archery 通知]My2SQL执行失败'
msg_content = f'{task.result}'
# 发送
msg_to = [task.kwargs['user']]
__send(msg_title, msg_content, msg_to)
52 changes: 52 additions & 0 deletions sql/plugins/my2sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# -*- coding: UTF-8 -*-
from common.config import SysConfig
from sql.plugins.plugin import Plugin
import shlex


class My2SQL(Plugin):

def __init__(self):
self.path = SysConfig().get('my2sql')
self.required_args = []
self.disable_args = []
super(Plugin, self).__init__()

def generate_args2cmd(self, args, shell):
"""
转换请求参数为命令行
:param args:
:param shell:
:return:
"""
conn_options = ['conn_options']
args_options = ['work-type', 'threads', 'start-file', 'stop-file', 'start-pos',
'stop-pos', 'databases', 'tables', 'sql', 'output-dir']
no_args_options = ['output-toScreen', 'add-extraInfo', 'ignore-primaryKey-forInsert',
'full-columns', 'do-not-add-prifixDb', 'file-per-table']
datetime_options = ['start-datetime', 'stop-datetime']
if shell:
cmd_args = f'{shlex.quote(str(self.path))}' if self.path else ''
for name, value in args.items():
if name in conn_options:
cmd_args += f' {value}'
elif name in args_options and value:
cmd_args += f' -{name} {shlex.quote(str(value))}'
elif name in datetime_options and value:
cmd_args += f" -{name} '{shlex.quote(str(value))}'"
elif name in no_args_options and value:
cmd_args += f' -{name}'
else:
cmd_args = [self.path]
for name, value in args.items():
if name in conn_options:
cmd_args.append(f'{value}')
elif name in args_options:
cmd_args.append(f'-{name}')
cmd_args.append(f'{value}')
elif name in datetime_options:
cmd_args.append(f'-{name}')
cmd_args.append(f"'{value}'")
elif name in no_args_options:
cmd_args.append(f'-{name}')
return cmd_args
31 changes: 31 additions & 0 deletions sql/plugins/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from django.contrib.auth import get_user_model

from sql.plugins.binglog2sql import Binlog2Sql
from sql.plugins.my2sql import My2SQL
from sql.plugins.schemasync import SchemaSync
from sql.plugins.soar import Soar
from sql.plugins.sqladvisor import SQLAdvisor
Expand Down Expand Up @@ -199,6 +200,36 @@ def test_binlog2ql_generate_args2cmd(self):
cmd_args = binlog2sql.generate_args2cmd(args, True)
self.assertIsInstance(cmd_args, str)

def test_my2sql_generate_args2cmd(self):
"""
测试my2sql参数转换
:return:
"""
args = {'conn_options': "-host mysql -user root -password '123456' -port 3306 ",
'work-type': '2sql',
'start-file': 'mysql-bin.000043',
'start-pos': 111,
'stop-file': '',
'stop-pos': '',
'start-datetime': '',
'stop-datetime': '',
'databases': 'account_center',
'tables': 'ac_apps',
'sql': 'update',
"threads": 1,
"add-extraInfo": "false",
"ignore-primaryKey-forInsert": "false",
"full-columns": "false",
"do-not-add-prifixDb": "false",
"file-per-table": "false"}
self.sys_config.set('my2sql', '/opt/archery/src/plugins/my2sql')
self.sys_config.get_all_config()
my2sql = My2SQL()
cmd_args = my2sql.generate_args2cmd(args, False)
self.assertIsInstance(cmd_args, list)
cmd_args = my2sql.generate_args2cmd(args, True)
self.assertIsInstance(cmd_args, str)

def test_pt_archiver_generate_args2cmd(self):
"""
测试pt_archiver参数转换
Expand Down
Loading

0 comments on commit f5292f4

Please sign in to comment.