Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

正常设置自动通过的工单 #2369

Merged
merged 3 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions sql/archiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ def archive_apply(request):
except AuditException as e:
logger.error(f"新建审批流失败: {str(e)}")
return JsonResponse({"status": 1, "msg": "新建审批流失败, 请联系管理员", "data": {}})
audit_handler.workflow.status = audit_handler.audit.current_status
if audit_handler.audit.current_status == WorkflowStatus.PASSED:
audit_handler.workflow.state = True
audit_handler.workflow.save()
async_task(
notify_for_audit,
workflow_audit=audit_handler.audit,
Expand All @@ -217,6 +221,7 @@ def archive_apply(request):
"data": {
"workflow_status": audit_handler.audit.current_status,
"audit_id": audit_handler.audit.audit_id,
"archive_id": audit_handler.workflow.id,
},
}
)
Expand Down
301 changes: 301 additions & 0 deletions sql/test_archiver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,301 @@
import json
from unittest.mock import patch

from django.conf import settings
from django.contrib.auth.models import Permission
from django.test import TestCase, Client

from common.config import SysConfig
from common.utils.const import WorkflowStatus, WorkflowType
from sql.utils.workflow_audit import AuditSetting
from sql.archiver import add_archive_task, archive
from sql.models import (
Instance,
ResourceGroup,
ArchiveConfig,
WorkflowAudit,
WorkflowAuditSetting,
)
from sql.tests import User


class TestArchiver(TestCase):
"""
测试Archive
"""

def setUp(self):
self.superuser = User.objects.create(username="super", is_superuser=True)
self.u1 = User.objects.create(username="u1", is_superuser=False)
self.u2 = User.objects.create(username="u2", is_superuser=False)
menu_archive = Permission.objects.get(codename="menu_archive")
archive_review = Permission.objects.get(codename="archive_review")
self.u1.user_permissions.add(menu_archive)
self.u2.user_permissions.add(menu_archive)
self.u2.user_permissions.add(archive_review)
# 使用 travis.ci 时实例和测试service保持一致
self.ins = Instance.objects.create(
instance_name="test_instance",
type="master",
db_type="mysql",
host=settings.DATABASES["default"]["HOST"],
port=settings.DATABASES["default"]["PORT"],
user=settings.DATABASES["default"]["USER"],
password=settings.DATABASES["default"]["PASSWORD"],
)
self.res_group = ResourceGroup.objects.create(
group_id=1, group_name="group_name"
)
self.archive_apply = ArchiveConfig.objects.create(
title="title",
resource_group=self.res_group,
audit_auth_groups="some_audit_group",
src_instance=self.ins,
src_db_name="src_db_name",
src_table_name="src_table_name",
dest_instance=self.ins,
dest_db_name="src_db_name",
dest_table_name="src_table_name",
condition="1=1",
mode="file",
no_delete=True,
sleep=1,
status=WorkflowStatus.WAITING,
state=False,
user_name="some_user",
user_display="display",
)
self.audit_flow = WorkflowAudit.objects.create(
group_id=1,
group_name="g1",
workflow_id=self.archive_apply.id,
workflow_type=WorkflowType.ARCHIVE,
workflow_title="123",
audit_auth_groups="123",
current_audit="",
next_audit="",
current_status=WorkflowStatus.WAITING,
create_user="",
create_user_display="",
)
self.sys_config = SysConfig()
self.client = Client()

def tearDown(self):
User.objects.all().delete()
ResourceGroup.objects.all().delete()
ArchiveConfig.objects.all().delete()
WorkflowAuditSetting.objects.all().delete()
self.ins.delete()
self.sys_config.purge()

def test_archive_list_super(self):
"""
测试管理员获取归档申请列表
:return:
"""
data = {"filter_instance_id": self.ins.id, "state": "false", "search": "text"}
self.client.force_login(self.superuser)
r = self.client.get(path="/archive/list/", data=data)
self.assertDictEqual(json.loads(r.content), {"total": 0, "rows": []})

def test_archive_list_own(self):
"""
测试非管理员和审核人获取归档申请列表
:return:
"""
data = {"filter_instance_id": self.ins.id, "state": "false", "search": "text"}
self.client.force_login(self.u1)
r = self.client.get(path="/archive/list/", data=data)
self.assertDictEqual(json.loads(r.content), {"total": 0, "rows": []})

def test_archive_list_review(self):
"""
测试审核人获取归档申请列表
:return:
"""
data = {"filter_instance_id": self.ins.id, "state": "false", "search": "text"}
self.client.force_login(self.u2)
r = self.client.get(path="/archive/list/", data=data)
self.assertDictEqual(json.loads(r.content), {"total": 0, "rows": []})

def test_archive_apply_not_param(self):
"""
测试申请归档实例数据,参数不完整
:return:
"""
data = {
"group_name": self.res_group.group_name,
"src_instance_name": self.ins.instance_name,
"src_db_name": "src_db_name",
"src_table_name": "src_table_name",
"mode": "dest",
"dest_instance_name": self.ins.instance_name,
"dest_db_name": "dest_db_name",
"dest_table_name": "dest_table_name",
"condition": "1=1",
"no_delete": "true",
"sleep": 10,
}
self.client.force_login(self.superuser)
r = self.client.post(path="/archive/apply/", data=data)
self.assertDictEqual(
json.loads(r.content), {"status": 1, "msg": "请填写完整!", "data": {}}
)

def test_archive_apply_not_dest_param(self):
"""
测试申请归档实例数据,目标实例不完整
:return:
"""
data = {
"title": "title",
"group_name": self.res_group.group_name,
"src_instance_name": self.ins.instance_name,
"src_db_name": "src_db_name",
"src_table_name": "src_table_name",
"mode": "dest",
"condition": "1=1",
"no_delete": "true",
"sleep": 10,
}
self.client.force_login(self.superuser)
r = self.client.post(path="/archive/apply/", data=data)
self.assertDictEqual(
json.loads(r.content), {"status": 1, "msg": "归档到实例时目标实例信息必选!", "data": {}}
)

def test_archive_apply_not_exist_review(self):
"""
测试申请归档实例数据,未配置审批流程
:return:
"""
data = {
"title": "title",
"group_name": self.res_group.group_name,
"src_instance_name": self.ins.instance_name,
"src_db_name": "src_db_name",
"src_table_name": "src_table_name",
"mode": "dest",
"dest_instance_name": self.ins.instance_name,
"dest_db_name": "dest_db_name",
"dest_table_name": "dest_table_name",
"condition": "1=1",
"no_delete": "true",
"sleep": 10,
}
self.client.force_login(self.superuser)
r = self.client.post(path="/archive/apply/", data=data)
self.assertDictEqual(
json.loads(r.content),
{"data": {}, "msg": "新建审批流失败, 请联系管理员", "status": 1},
)

@patch("sql.archiver.async_task")
def test_archive_apply(self, _async_task):
"""
测试申请归档实例数据
:return:
"""
WorkflowAuditSetting.objects.create(
workflow_type=3, group_id=1, audit_auth_groups="1"
)
data = {
"title": "title",
"group_name": self.res_group.group_name,
"src_instance_name": self.ins.instance_name,
"src_db_name": "src_db_name",
"src_table_name": "src_table_name",
"mode": "dest",
"dest_instance_name": self.ins.instance_name,
"dest_db_name": "dest_db_name",
"dest_table_name": "dest_table_name",
"condition": "1=1",
"no_delete": "true",
"sleep": 10,
}
self.client.force_login(self.superuser)
r = self.client.post(path="/archive/apply/", data=data)
self.assertEqual(json.loads(r.content)["status"], 0)

@patch("sql.utils.workflow_audit.AuditV2.generate_audit_setting")
def test_archive_apply_auto_pass(self, mock_generate_setting):
mock_generate_setting.return_value = AuditSetting(
auto_pass=True,
)
data = {
"title": "title",
"group_name": self.res_group.group_name,
"src_instance_name": self.ins.instance_name,
"src_db_name": "src_db_name",
"src_table_name": "src_table_name",
"mode": "dest",
"dest_instance_name": self.ins.instance_name,
"dest_db_name": "dest_db_name",
"dest_table_name": "dest_table_name",
"condition": "1=1",
"no_delete": "true",
"sleep": 10,
}
self.client.force_login(self.superuser)
r = self.client.post(path="/archive/apply/", data=data)
return_data = r.json()
self.assertEqual(return_data["status"], 0)
archive_config = ArchiveConfig.objects.get(id=return_data["data"]["archive_id"])
assert archive_config.state == True
assert archive_config.status == WorkflowStatus.PASSED

@patch("sql.utils.workflow_audit.AuditV2.operate")
@patch("sql.archiver.async_task")
def test_archive_audit(self, _async_task, mock_operate):
"""
测试审核归档实例数据
:return:
"""
mock_operate.return_value = None
data = {
"archive_id": self.archive_apply.id,
"audit_status": WorkflowStatus.PASSED,
"audit_remark": "xxxx",
}
# operate 被 patch 了, 这里强制设置一下, 走一下流程
self.audit_flow.current_status = WorkflowStatus.PASSED
self.audit_flow.save()
self.client.force_login(self.superuser)
r = self.client.post(path="/archive/audit/", data=data)
self.assertRedirects(
r, f"/archive/{self.archive_apply.id}/", fetch_redirect_response=False
)
self.archive_apply.refresh_from_db()
assert self.archive_apply.state == True
assert self.archive_apply.status == WorkflowStatus.PASSED

@patch("sql.archiver.async_task")
def test_add_archive_task(self, _async_task):
"""
测试添加异步归档任务
:return:
"""
add_archive_task()

@patch("sql.archiver.async_task")
def test_add_archive(self, _async_task):
"""
测试执行归档任务
:return:
"""
with self.assertRaises(Exception):
archive(self.archive_apply.id)

@patch("sql.archiver.async_task")
def test_archive_log(self, _async_task):
"""
测试获取归档日志
:return:
"""
data = {
"archive_id": self.archive_apply.id,
}
self.client.force_login(self.superuser)
r = self.client.post(path="/archive/log/", data=data)
self.assertDictEqual(json.loads(r.content), {"total": 0, "rows": []})
Loading
Loading