From 9183bccdbfef740dac1cdb6593c471932d6cf7c7 Mon Sep 17 00:00:00 2001 From: Leo Q Date: Tue, 7 Nov 2023 13:53:36 +0800 Subject: [PATCH] =?UTF-8?q?=E6=AD=A3=E5=B8=B8=E8=AE=BE=E7=BD=AE=E8=87=AA?= =?UTF-8?q?=E5=8A=A8=E9=80=9A=E8=BF=87=E7=9A=84=E5=B7=A5=E5=8D=95=20(#2369?= =?UTF-8?q?)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 正常设置自动通过的工单 * 补充自动通过的单元测试 * black --- sql/archiver.py | 5 + sql/test_archiver.py | 301 +++++++++++++++++++++++++++++++++++++++++ sql/tests.py | 259 ----------------------------------- sql_api/serializers.py | 6 +- sql_api/tests.py | 20 +++ 5 files changed, 331 insertions(+), 260 deletions(-) create mode 100644 sql/test_archiver.py diff --git a/sql/archiver.py b/sql/archiver.py index 7b9416135c..e27d545470 100644 --- a/sql/archiver.py +++ b/sql/archiver.py @@ -204,6 +204,10 @@ def archive_apply(request): except AuditException as e: logger.error(f"新建审批流失败: {str(e)}") return JsonResponse({"status": 1, "msg": "新建审批流失败, 请联系管理员", "data": {}}) + audit_handler.workflow.status = audit_handler.audit.current_status + if audit_handler.audit.current_status == WorkflowStatus.PASSED: + audit_handler.workflow.state = True + audit_handler.workflow.save() async_task( notify_for_audit, workflow_audit=audit_handler.audit, @@ -217,6 +221,7 @@ def archive_apply(request): "data": { "workflow_status": audit_handler.audit.current_status, "audit_id": audit_handler.audit.audit_id, + "archive_id": audit_handler.workflow.id, }, } ) diff --git a/sql/test_archiver.py b/sql/test_archiver.py new file mode 100644 index 0000000000..33e444e0f3 --- /dev/null +++ b/sql/test_archiver.py @@ -0,0 +1,301 @@ +import json +from unittest.mock import patch + +from django.conf import settings +from django.contrib.auth.models import Permission +from django.test import TestCase, Client + +from common.config import SysConfig +from common.utils.const import WorkflowStatus, WorkflowType +from sql.utils.workflow_audit import AuditSetting +from sql.archiver import add_archive_task, archive +from sql.models import ( + Instance, + ResourceGroup, + ArchiveConfig, + WorkflowAudit, + WorkflowAuditSetting, +) +from sql.tests import User + + +class TestArchiver(TestCase): + """ + 测试Archive + """ + + def setUp(self): + self.superuser = User.objects.create(username="super", is_superuser=True) + self.u1 = User.objects.create(username="u1", is_superuser=False) + self.u2 = User.objects.create(username="u2", is_superuser=False) + menu_archive = Permission.objects.get(codename="menu_archive") + archive_review = Permission.objects.get(codename="archive_review") + self.u1.user_permissions.add(menu_archive) + self.u2.user_permissions.add(menu_archive) + self.u2.user_permissions.add(archive_review) + # 使用 travis.ci 时实例和测试service保持一致 + self.ins = Instance.objects.create( + instance_name="test_instance", + type="master", + db_type="mysql", + host=settings.DATABASES["default"]["HOST"], + port=settings.DATABASES["default"]["PORT"], + user=settings.DATABASES["default"]["USER"], + password=settings.DATABASES["default"]["PASSWORD"], + ) + self.res_group = ResourceGroup.objects.create( + group_id=1, group_name="group_name" + ) + self.archive_apply = ArchiveConfig.objects.create( + title="title", + resource_group=self.res_group, + audit_auth_groups="some_audit_group", + src_instance=self.ins, + src_db_name="src_db_name", + src_table_name="src_table_name", + dest_instance=self.ins, + dest_db_name="src_db_name", + dest_table_name="src_table_name", + condition="1=1", + mode="file", + no_delete=True, + sleep=1, + status=WorkflowStatus.WAITING, + state=False, + user_name="some_user", + user_display="display", + ) + self.audit_flow = WorkflowAudit.objects.create( + group_id=1, + group_name="g1", + workflow_id=self.archive_apply.id, + workflow_type=WorkflowType.ARCHIVE, + workflow_title="123", + audit_auth_groups="123", + current_audit="", + next_audit="", + current_status=WorkflowStatus.WAITING, + create_user="", + create_user_display="", + ) + self.sys_config = SysConfig() + self.client = Client() + + def tearDown(self): + User.objects.all().delete() + ResourceGroup.objects.all().delete() + ArchiveConfig.objects.all().delete() + WorkflowAuditSetting.objects.all().delete() + self.ins.delete() + self.sys_config.purge() + + def test_archive_list_super(self): + """ + 测试管理员获取归档申请列表 + :return: + """ + data = {"filter_instance_id": self.ins.id, "state": "false", "search": "text"} + self.client.force_login(self.superuser) + r = self.client.get(path="/archive/list/", data=data) + self.assertDictEqual(json.loads(r.content), {"total": 0, "rows": []}) + + def test_archive_list_own(self): + """ + 测试非管理员和审核人获取归档申请列表 + :return: + """ + data = {"filter_instance_id": self.ins.id, "state": "false", "search": "text"} + self.client.force_login(self.u1) + r = self.client.get(path="/archive/list/", data=data) + self.assertDictEqual(json.loads(r.content), {"total": 0, "rows": []}) + + def test_archive_list_review(self): + """ + 测试审核人获取归档申请列表 + :return: + """ + data = {"filter_instance_id": self.ins.id, "state": "false", "search": "text"} + self.client.force_login(self.u2) + r = self.client.get(path="/archive/list/", data=data) + self.assertDictEqual(json.loads(r.content), {"total": 0, "rows": []}) + + def test_archive_apply_not_param(self): + """ + 测试申请归档实例数据,参数不完整 + :return: + """ + data = { + "group_name": self.res_group.group_name, + "src_instance_name": self.ins.instance_name, + "src_db_name": "src_db_name", + "src_table_name": "src_table_name", + "mode": "dest", + "dest_instance_name": self.ins.instance_name, + "dest_db_name": "dest_db_name", + "dest_table_name": "dest_table_name", + "condition": "1=1", + "no_delete": "true", + "sleep": 10, + } + self.client.force_login(self.superuser) + r = self.client.post(path="/archive/apply/", data=data) + self.assertDictEqual( + json.loads(r.content), {"status": 1, "msg": "请填写完整!", "data": {}} + ) + + def test_archive_apply_not_dest_param(self): + """ + 测试申请归档实例数据,目标实例不完整 + :return: + """ + data = { + "title": "title", + "group_name": self.res_group.group_name, + "src_instance_name": self.ins.instance_name, + "src_db_name": "src_db_name", + "src_table_name": "src_table_name", + "mode": "dest", + "condition": "1=1", + "no_delete": "true", + "sleep": 10, + } + self.client.force_login(self.superuser) + r = self.client.post(path="/archive/apply/", data=data) + self.assertDictEqual( + json.loads(r.content), {"status": 1, "msg": "归档到实例时目标实例信息必选!", "data": {}} + ) + + def test_archive_apply_not_exist_review(self): + """ + 测试申请归档实例数据,未配置审批流程 + :return: + """ + data = { + "title": "title", + "group_name": self.res_group.group_name, + "src_instance_name": self.ins.instance_name, + "src_db_name": "src_db_name", + "src_table_name": "src_table_name", + "mode": "dest", + "dest_instance_name": self.ins.instance_name, + "dest_db_name": "dest_db_name", + "dest_table_name": "dest_table_name", + "condition": "1=1", + "no_delete": "true", + "sleep": 10, + } + self.client.force_login(self.superuser) + r = self.client.post(path="/archive/apply/", data=data) + self.assertDictEqual( + json.loads(r.content), + {"data": {}, "msg": "新建审批流失败, 请联系管理员", "status": 1}, + ) + + @patch("sql.archiver.async_task") + def test_archive_apply(self, _async_task): + """ + 测试申请归档实例数据 + :return: + """ + WorkflowAuditSetting.objects.create( + workflow_type=3, group_id=1, audit_auth_groups="1" + ) + data = { + "title": "title", + "group_name": self.res_group.group_name, + "src_instance_name": self.ins.instance_name, + "src_db_name": "src_db_name", + "src_table_name": "src_table_name", + "mode": "dest", + "dest_instance_name": self.ins.instance_name, + "dest_db_name": "dest_db_name", + "dest_table_name": "dest_table_name", + "condition": "1=1", + "no_delete": "true", + "sleep": 10, + } + self.client.force_login(self.superuser) + r = self.client.post(path="/archive/apply/", data=data) + self.assertEqual(json.loads(r.content)["status"], 0) + + @patch("sql.utils.workflow_audit.AuditV2.generate_audit_setting") + def test_archive_apply_auto_pass(self, mock_generate_setting): + mock_generate_setting.return_value = AuditSetting( + auto_pass=True, + ) + data = { + "title": "title", + "group_name": self.res_group.group_name, + "src_instance_name": self.ins.instance_name, + "src_db_name": "src_db_name", + "src_table_name": "src_table_name", + "mode": "dest", + "dest_instance_name": self.ins.instance_name, + "dest_db_name": "dest_db_name", + "dest_table_name": "dest_table_name", + "condition": "1=1", + "no_delete": "true", + "sleep": 10, + } + self.client.force_login(self.superuser) + r = self.client.post(path="/archive/apply/", data=data) + return_data = r.json() + self.assertEqual(return_data["status"], 0) + archive_config = ArchiveConfig.objects.get(id=return_data["data"]["archive_id"]) + assert archive_config.state == True + assert archive_config.status == WorkflowStatus.PASSED + + @patch("sql.utils.workflow_audit.AuditV2.operate") + @patch("sql.archiver.async_task") + def test_archive_audit(self, _async_task, mock_operate): + """ + 测试审核归档实例数据 + :return: + """ + mock_operate.return_value = None + data = { + "archive_id": self.archive_apply.id, + "audit_status": WorkflowStatus.PASSED, + "audit_remark": "xxxx", + } + # operate 被 patch 了, 这里强制设置一下, 走一下流程 + self.audit_flow.current_status = WorkflowStatus.PASSED + self.audit_flow.save() + self.client.force_login(self.superuser) + r = self.client.post(path="/archive/audit/", data=data) + self.assertRedirects( + r, f"/archive/{self.archive_apply.id}/", fetch_redirect_response=False + ) + self.archive_apply.refresh_from_db() + assert self.archive_apply.state == True + assert self.archive_apply.status == WorkflowStatus.PASSED + + @patch("sql.archiver.async_task") + def test_add_archive_task(self, _async_task): + """ + 测试添加异步归档任务 + :return: + """ + add_archive_task() + + @patch("sql.archiver.async_task") + def test_add_archive(self, _async_task): + """ + 测试执行归档任务 + :return: + """ + with self.assertRaises(Exception): + archive(self.archive_apply.id) + + @patch("sql.archiver.async_task") + def test_archive_log(self, _async_task): + """ + 测试获取归档日志 + :return: + """ + data = { + "archive_id": self.archive_apply.id, + } + self.client.force_login(self.superuser) + r = self.client.post(path="/archive/log/", data=data) + self.assertDictEqual(json.loads(r.content), {"total": 0, "rows": []}) diff --git a/sql/tests.py b/sql/tests.py index 9777acac36..ad4c61bed8 100644 --- a/sql/tests.py +++ b/sql/tests.py @@ -10,7 +10,6 @@ import sql.query_privileges from common.config import SysConfig from common.utils.const import WorkflowStatus, WorkflowType -from sql.archiver import add_archive_task, archive from sql.binlog import my2sql_file from sql.engines.models import ResultSet from sql.utils.execute_sql import execute_callback @@ -27,9 +26,6 @@ WorkflowAudit, QueryLog, WorkflowLog, - WorkflowAuditSetting, - ArchiveConfig, - WorkflowAuditDetail, ) from sql.utils.workflow_audit import AuditException @@ -1918,261 +1914,6 @@ def test_schema_sync(self): self.assertEqual(json.loads(r.content)["status"], 0) -class TestArchiver(TestCase): - """ - 测试Archive - """ - - def setUp(self): - self.superuser = User.objects.create(username="super", is_superuser=True) - self.u1 = User.objects.create(username="u1", is_superuser=False) - self.u2 = User.objects.create(username="u2", is_superuser=False) - menu_archive = Permission.objects.get(codename="menu_archive") - archive_review = Permission.objects.get(codename="archive_review") - self.u1.user_permissions.add(menu_archive) - self.u2.user_permissions.add(menu_archive) - self.u2.user_permissions.add(archive_review) - # 使用 travis.ci 时实例和测试service保持一致 - self.ins = Instance.objects.create( - instance_name="test_instance", - type="master", - db_type="mysql", - host=settings.DATABASES["default"]["HOST"], - port=settings.DATABASES["default"]["PORT"], - user=settings.DATABASES["default"]["USER"], - password=settings.DATABASES["default"]["PASSWORD"], - ) - self.res_group = ResourceGroup.objects.create( - group_id=1, group_name="group_name" - ) - self.archive_apply = ArchiveConfig.objects.create( - title="title", - resource_group=self.res_group, - audit_auth_groups="some_audit_group", - src_instance=self.ins, - src_db_name="src_db_name", - src_table_name="src_table_name", - dest_instance=self.ins, - dest_db_name="src_db_name", - dest_table_name="src_table_name", - condition="1=1", - mode="file", - no_delete=True, - sleep=1, - status=WorkflowStatus.WAITING, - state=False, - user_name="some_user", - user_display="display", - ) - self.audit_flow = WorkflowAudit.objects.create( - group_id=1, - group_name="g1", - workflow_id=self.archive_apply.id, - workflow_type=WorkflowType.ARCHIVE, - workflow_title="123", - audit_auth_groups="123", - current_audit="", - next_audit="", - current_status=WorkflowStatus.WAITING, - create_user="", - create_user_display="", - ) - self.sys_config = SysConfig() - self.client = Client() - - def tearDown(self): - User.objects.all().delete() - ResourceGroup.objects.all().delete() - ArchiveConfig.objects.all().delete() - WorkflowAuditSetting.objects.all().delete() - self.ins.delete() - self.sys_config.purge() - - def test_archive_list_super(self): - """ - 测试管理员获取归档申请列表 - :return: - """ - data = {"filter_instance_id": self.ins.id, "state": "false", "search": "text"} - self.client.force_login(self.superuser) - r = self.client.get(path="/archive/list/", data=data) - self.assertDictEqual(json.loads(r.content), {"total": 0, "rows": []}) - - def test_archive_list_own(self): - """ - 测试非管理员和审核人获取归档申请列表 - :return: - """ - data = {"filter_instance_id": self.ins.id, "state": "false", "search": "text"} - self.client.force_login(self.u1) - r = self.client.get(path="/archive/list/", data=data) - self.assertDictEqual(json.loads(r.content), {"total": 0, "rows": []}) - - def test_archive_list_review(self): - """ - 测试审核人获取归档申请列表 - :return: - """ - data = {"filter_instance_id": self.ins.id, "state": "false", "search": "text"} - self.client.force_login(self.u2) - r = self.client.get(path="/archive/list/", data=data) - self.assertDictEqual(json.loads(r.content), {"total": 0, "rows": []}) - - def test_archive_apply_not_param(self): - """ - 测试申请归档实例数据,参数不完整 - :return: - """ - data = { - "group_name": self.res_group.group_name, - "src_instance_name": self.ins.instance_name, - "src_db_name": "src_db_name", - "src_table_name": "src_table_name", - "mode": "dest", - "dest_instance_name": self.ins.instance_name, - "dest_db_name": "dest_db_name", - "dest_table_name": "dest_table_name", - "condition": "1=1", - "no_delete": "true", - "sleep": 10, - } - self.client.force_login(self.superuser) - r = self.client.post(path="/archive/apply/", data=data) - self.assertDictEqual( - json.loads(r.content), {"status": 1, "msg": "请填写完整!", "data": {}} - ) - - def test_archive_apply_not_dest_param(self): - """ - 测试申请归档实例数据,目标实例不完整 - :return: - """ - data = { - "title": "title", - "group_name": self.res_group.group_name, - "src_instance_name": self.ins.instance_name, - "src_db_name": "src_db_name", - "src_table_name": "src_table_name", - "mode": "dest", - "condition": "1=1", - "no_delete": "true", - "sleep": 10, - } - self.client.force_login(self.superuser) - r = self.client.post(path="/archive/apply/", data=data) - self.assertDictEqual( - json.loads(r.content), {"status": 1, "msg": "归档到实例时目标实例信息必选!", "data": {}} - ) - - def test_archive_apply_not_exist_review(self): - """ - 测试申请归档实例数据,未配置审批流程 - :return: - """ - data = { - "title": "title", - "group_name": self.res_group.group_name, - "src_instance_name": self.ins.instance_name, - "src_db_name": "src_db_name", - "src_table_name": "src_table_name", - "mode": "dest", - "dest_instance_name": self.ins.instance_name, - "dest_db_name": "dest_db_name", - "dest_table_name": "dest_table_name", - "condition": "1=1", - "no_delete": "true", - "sleep": 10, - } - self.client.force_login(self.superuser) - r = self.client.post(path="/archive/apply/", data=data) - self.assertDictEqual( - json.loads(r.content), - {"data": {}, "msg": "新建审批流失败, 请联系管理员", "status": 1}, - ) - - @patch("sql.archiver.async_task") - def test_archive_apply(self, _async_task): - """ - 测试申请归档实例数据 - :return: - """ - WorkflowAuditSetting.objects.create( - workflow_type=3, group_id=1, audit_auth_groups="1" - ) - data = { - "title": "title", - "group_name": self.res_group.group_name, - "src_instance_name": self.ins.instance_name, - "src_db_name": "src_db_name", - "src_table_name": "src_table_name", - "mode": "dest", - "dest_instance_name": self.ins.instance_name, - "dest_db_name": "dest_db_name", - "dest_table_name": "dest_table_name", - "condition": "1=1", - "no_delete": "true", - "sleep": 10, - } - self.client.force_login(self.superuser) - r = self.client.post(path="/archive/apply/", data=data) - self.assertEqual(json.loads(r.content)["status"], 0) - - @patch("sql.utils.workflow_audit.AuditV2.operate") - @patch("sql.archiver.async_task") - def test_archive_audit(self, _async_task, mock_operate): - """ - 测试审核归档实例数据 - :return: - """ - mock_operate.return_value = None - data = { - "archive_id": self.archive_apply.id, - "audit_status": WorkflowStatus.PASSED, - "audit_remark": "xxxx", - } - # operate 被 patch 了, 这里强制设置一下, 走一下流程 - self.audit_flow.current_status = WorkflowStatus.PASSED - self.audit_flow.save() - self.client.force_login(self.superuser) - r = self.client.post(path="/archive/audit/", data=data) - self.assertRedirects( - r, f"/archive/{self.archive_apply.id}/", fetch_redirect_response=False - ) - self.archive_apply.refresh_from_db() - assert self.archive_apply.state == True - assert self.archive_apply.status == WorkflowStatus.PASSED - - @patch("sql.archiver.async_task") - def test_add_archive_task(self, _async_task): - """ - 测试添加异步归档任务 - :return: - """ - add_archive_task() - - @patch("sql.archiver.async_task") - def test_add_archive(self, _async_task): - """ - 测试执行归档任务 - :return: - """ - with self.assertRaises(Exception): - archive(self.archive_apply.id) - - @patch("sql.archiver.async_task") - def test_archive_log(self, _async_task): - """ - 测试获取归档日志 - :return: - """ - data = { - "archive_id": self.archive_apply.id, - } - self.client.force_login(self.superuser) - r = self.client.post(path="/archive/log/", data=data) - self.assertDictEqual(json.loads(r.content), {"total": 0, "rows": []}) - - class TestAsync(TestCase): def setUp(self): self.now = datetime.now() diff --git a/sql_api/serializers.py b/sql_api/serializers.py index 1b87f45615..67834a58c2 100644 --- a/sql_api/serializers.py +++ b/sql_api/serializers.py @@ -20,7 +20,7 @@ from sql.engines import get_engine from sql.utils.workflow_audit import Audit, get_auditor from sql.utils.resource_group import user_instances -from common.utils.const import WorkflowType +from common.utils.const import WorkflowType, WorkflowStatus from common.config import SysConfig import traceback import logging @@ -428,6 +428,10 @@ def create(self, validated_data): except Exception as e: logger.error(f"提交工单报错,错误信息:{traceback.format_exc()}") raise serializers.ValidationError({"errors": str(e)}) + # 有时候提交后自动审批通过, 在这里改写一下 workflow 状态 + if auditor.audit.current_status == WorkflowStatus.PASSED: + auditor.workflow.status = "workflow_review_pass" + auditor.workflow.save() return workflow_content class Meta: diff --git a/sql_api/tests.py b/sql_api/tests.py index 995f940605..ee56da6256 100644 --- a/sql_api/tests.py +++ b/sql_api/tests.py @@ -7,6 +7,7 @@ from rest_framework.test import APITestCase from rest_framework import status from common.config import SysConfig +from sql.utils.workflow_audit import AuditSetting from sql.engines import ReviewSet from sql.engines.models import ReviewResult from sql.models import ( @@ -589,6 +590,25 @@ def test_submit_workflow_super(self): self.assertEqual(r.json()["workflow"]["engineer"], user2.username) self.assertEqual(r.json()["workflow"]["engineer_display"], user2.display) + @patch("sql.utils.workflow_audit.AuditV2.generate_audit_setting") + def test_submit_workflow_auto_pass(self, mock_generate_settings): + json_data = { + "workflow": { + "workflow_name": "上线工单1", + "demand_url": "test", + "group_id": 1, + "db_name": "test_db", + "instance": self.ins.id, + }, + "sql_content": "alter table abc add column note varchar(64);", + } + mock_generate_settings.return_value = AuditSetting(auto_pass=True) + r = self.client.post("/api/v1/workflow/", json_data, format="json") + return_data = r.json() + self.assertEqual(r.status_code, status.HTTP_201_CREATED) + workflow_in_db = SqlWorkflow.objects.get(id=return_data["workflow"]["id"]) + assert workflow_in_db.status == "workflow_review_pass" + def test_submit_param_is_None(self): """测试SQL提交,参数内容为空""" json_data = {