Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added async operation to import data sources #1915

Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
ad42eb5
feat: added async operation to import data sources
rolin999 Sep 2, 2024
46abd74
feat: added async operation to import data sources
rolin999 Sep 3, 2024
9e299ef
feat: added async operation to import data sources
rolin999 Sep 3, 2024
5009422
resolve: fixed code style and test case
rolin999 Sep 3, 2024
2388f66
Merge branch 'main' into add-async-operation-for-import-data-sources
rolin999 Sep 3, 2024
c5a34d7
resolve: added test case
rolin999 Sep 3, 2024
f97ab21
resolve: fix code style and test case
rolin999 Sep 5, 2024
de9e329
resolve: added abstract class for temporary storage
rolin999 Sep 5, 2024
bc71e11
resolve: added abstract for temporary storage
rolin999 Sep 6, 2024
de434d9
resolve: change code style
rolin999 Sep 9, 2024
1b55f36
resolve: adjust the code structure and test case
rolin999 Sep 11, 2024
9d818e7
resolve: change port in start.sh
rolin999 Sep 11, 2024
76139fc
resolve: added _ensure_only_basic_type_in_kwargs fucntion
rolin999 Sep 11, 2024
f8f2a9a
resolve: fix code style
rolin999 Sep 11, 2024
fc16211
resolve: fix code style
rolin999 Sep 11, 2024
4476989
resolve: fix code style
rolin999 Sep 11, 2024
e9d4fdc
resolve: fix code style
rolin999 Sep 11, 2024
55d9ded
resolve: fix test case
rolin999 Sep 12, 2024
ecbde0c
Merge branch 'main' into add-async-operation-for-import-data-sources
rolin999 Sep 12, 2024
ac261b3
resolve: fix code style and test case
rolin999 Sep 12, 2024
c8380c7
resolve: fix code style and test case
rolin999 Sep 12, 2024
ae7b06c
resolve: fix code style and test case
rolin999 Sep 12, 2024
1ca267d
resolve: fix code style and test case
rolin999 Sep 18, 2024
d451c77
resolve: fix code style and test case
rolin999 Sep 20, 2024
d377700
resolve: fix code style
rolin999 Sep 20, 2024
9d37d5d
resolve: fix code style
rolin999 Sep 20, 2024
5282a9a
resolve: fix code style
rolin999 Sep 23, 2024
4a96475
resolve: fix code style
rolin999 Sep 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/bk-user/bkuser/apis/web/data_source/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,8 +465,7 @@ def post(self, request, *args, **kwargs):
operator=request.user.username,
overwrite=data["overwrite"],
incremental=data["incremental"],
# FIXME (su) 本地数据源导入也要改成异步行为,但是要解决 excel 如何传递的问题
async_run=False,
async_run=True,
trigger=SyncTaskTrigger.MANUAL,
)

Expand Down
6 changes: 6 additions & 0 deletions src/bk-user/bkuser/apps/sync/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from bkuser.apps.sync.models import DataSourceSyncTask, TenantSyncTask
from bkuser.apps.sync.runners import DataSourceSyncTaskRunner, TenantSyncTaskRunner
from bkuser.apps.sync.tasks import sync_data_source, sync_tenant
from bkuser.common.storage import TemporaryStorage


class DataSourceSyncManager:
Expand Down Expand Up @@ -49,6 +50,11 @@ def execute(self, plugin_init_extra_kwargs: Optional[Dict[str, Any]] = None) ->
)

if self.sync_options.async_run:
if self.data_source.is_local:
rolin999 marked this conversation as resolved.
Show resolved Hide resolved
storage = TemporaryStorage()
identifier_key = storage.save_workbook(plugin_init_extra_kwargs.get("workbook"))
rolin999 marked this conversation as resolved.
Show resolved Hide resolved
plugin_init_extra_kwargs = {"task_key": identifier_key}
rolin999 marked this conversation as resolved.
Show resolved Hide resolved

self._ensure_only_basic_type_in_kwargs(plugin_init_extra_kwargs)
sync_data_source.apply_async(args=[task.id, plugin_init_extra_kwargs], soft_time_limit=self.sync_timeout)
else:
Expand Down
22 changes: 22 additions & 0 deletions src/bk-user/bkuser/apps/sync/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""

import logging
from typing import Any, Dict

Expand All @@ -16,10 +17,12 @@
from bkuser.apps.data_source.models import DataSource
from bkuser.apps.notification.constants import NotificationScene
from bkuser.apps.notification.notifier import TenantUserNotifier
from bkuser.apps.sync.constants import SyncTaskStatus
from bkuser.apps.sync.models import DataSourceSyncTask, TenantSyncTask
from bkuser.apps.sync.runners import DataSourceSyncTaskRunner, TenantSyncTaskRunner
from bkuser.apps.tenant.models import TenantUser
from bkuser.celery import app
from bkuser.common.storage import TemporaryStorage
from bkuser.common.task import BaseTask

logger = logging.getLogger(__name__)
Expand All @@ -30,6 +33,25 @@ def sync_data_source(task_id: int, plugin_init_extra_kwargs: Dict[str, Any]):
"""同步数据源数据"""
rolin999 marked this conversation as resolved.
Show resolved Hide resolved
logger.info("[celery] receive data source sync task: %s", task_id)
task = DataSourceSyncTask.objects.get(id=task_id)
data_source = task.data_source
rolin999 marked this conversation as resolved.
Show resolved Hide resolved

if not data_source.is_local:
logger.debug("not local data source, skip data source sync task")
return
rolin999 marked this conversation as resolved.
Show resolved Hide resolved

# 若已指定原始数据 Key,则需要从缓存中获取数据
rolin999 marked this conversation as resolved.
Show resolved Hide resolved
if task_raw_data_key := plugin_init_extra_kwargs.get("task_key"):
storage = TemporaryStorage()
try:
workbook = storage.get_workbook(task_raw_data_key)
except ValueError:
task.status = SyncTaskStatus.FAILED
task.logs = f"data source sync task {task_id} require raw data in cache"
rolin999 marked this conversation as resolved.
Show resolved Hide resolved
task.save(update_fields=["status", "logs", "updated_at"])
return

plugin_init_extra_kwargs = {"workbook": workbook}

DataSourceSyncTaskRunner(task, plugin_init_extra_kwargs).run()
rolin999 marked this conversation as resolved.
Show resolved Hide resolved


Expand Down
3 changes: 3 additions & 0 deletions src/bk-user/bkuser/common/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""

import functools

from blue_krill.data_types.enum import EnumField, StructuredEnum
Expand Down Expand Up @@ -38,6 +39,8 @@ class CacheKeyPrefixEnum(str, StructuredEnum):
VERIFICATION_CODE = "vc"
# 用户重置密码用 Token
RESET_PASSWORD_TOKEN = "rpt"
# 临时存储
TEMPORARY_STORAGE = "ts"
rolin999 marked this conversation as resolved.
Show resolved Hide resolved


def _default_key_function(*args, **kwargs):
Expand Down
76 changes: 76 additions & 0 deletions src/bk-user/bkuser/common/storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-用户管理(Bk-User) available.
Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""

import base64
import io

from openpyxl import load_workbook
from openpyxl.workbook import Workbook

from bkuser.common.cache import Cache, CacheEnum, CacheKeyPrefixEnum
from bkuser.utils.uuid import generate_uuid

# 临时存储数据的过期时间
TemporaryStorageDefaultTimeout = 10 * 60


class TemporaryStorage:
"""数据源同步任务时 redis 临时存储"""
rolin999 marked this conversation as resolved.
Show resolved Hide resolved

def __init__(self):
self.storage = Cache(CacheEnum.REDIS, CacheKeyPrefixEnum.TEMPORARY_STORAGE)
rolin999 marked this conversation as resolved.
Show resolved Hide resolved

def save(self, data: bytes, timeout: int = TemporaryStorageDefaultTimeout) -> str:
"""
保存临时数据
:param data: 二进制数据
rolin999 marked this conversation as resolved.
Show resolved Hide resolved
:param timeout: 过期时间
:return: 临时数据唯一标识
"""
# 生成临时数据的唯一标识,用于后续查询
temporary_storage_id = generate_uuid()

encoded_data = base64.b64encode(data).decode("utf-8")
self.storage.set(temporary_storage_id, encoded_data, timeout)

return temporary_storage_id

def get(self, temporary_storage_id: str) -> bytes:
"""
获取临时数据
:param temporary_storage_id: 临时数据唯一标识
:return: 二进制数据
"""
encoded_data = self.storage.get(temporary_storage_id)
if not encoded_data:
raise ValueError(f"data(id={temporary_storage_id}) not found in temporary storage")

# 获取成功则删除,无需等待过期
rolin999 marked this conversation as resolved.
Show resolved Hide resolved
self.storage.delete(temporary_storage_id)

return base64.b64decode(encoded_data)

def save_workbook(self, workbook: Workbook) -> str:
rolin999 marked this conversation as resolved.
Show resolved Hide resolved
"""[快捷方法] 将 Excel workbook 保存到临时存储中,并返回临时存储的数据唯一标识"""

# 将 workbook 保存到 内存字节流,便于获取到字节内容
with io.BytesIO() as buffer:
workbook.save(buffer)
data = buffer.getvalue()

return self.save(data)

def get_workbook(self, temporary_storage_id: str) -> Workbook:
"""[快捷方法] 从临时存储中获取临时数据并转换为 Excel Workbook"""

data = self.get(temporary_storage_id)

return load_workbook(filename=io.BytesIO(data))
27 changes: 27 additions & 0 deletions src/bk-user/tests/apis/web/data_source/test_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@
from bkuser.apps.data_source.models import DataSource, DataSourceDepartment, DataSourceSensitiveInfo, DataSourceUser
from bkuser.apps.idp.constants import INVALID_REAL_DATA_SOURCE_ID, IdpStatus
from bkuser.apps.idp.models import Idp, IdpSensitiveInfo
from bkuser.apps.sync.constants import SyncTaskStatus
from bkuser.apps.sync.models import DataSourceSyncTask
from bkuser.plugins.constants import DataSourcePluginEnum
from bkuser.plugins.local.constants import PasswordGenerateMethod
from django.conf import settings
from django.core.files.uploadedfile import SimpleUploadedFile
from django.test.utils import override_settings
from django.urls import reverse
from rest_framework import status

Expand Down Expand Up @@ -441,3 +446,25 @@ def test_retrieve_other_tenant_data_source_sync_record(self, api_client, data_so
other_tenant_task = data_source_sync_tasks[2]
resp = api_client.get(reverse("data_source.sync_record.retrieve", kwargs={"id": other_tenant_task.id}))
assert resp.status_code == status.HTTP_404_NOT_FOUND


class TestDataSourceImportApi:
rolin999 marked this conversation as resolved.
Show resolved Hide resolved
def test_data_source_import_success(self, api_client, data_source):
with override_settings(CELERY_TASK_ALWAYS_EAGER=True, CELERY_TASK_EAGER_PROPAGATES=True):
with open(settings.BASE_DIR / "tests/assets/fake_users.xlsx", "rb") as excel_file:
uploaded_file = SimpleUploadedFile(
name="fake_users.xlsx",
content=excel_file.read(),
content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
)
resp = api_client.post(
reverse("data_source.import_from_excel", kwargs={"id": data_source.id}),
data={"overwrite": False, "incremental": True, "file": uploaded_file},
format="multipart",
)
sync_task = DataSourceSyncTask.objects.get(data_source=data_source)

assert resp.status_code == status.HTTP_200_OK
assert sync_task.status == SyncTaskStatus.SUCCESS
assert DataSourceUser.objects.filter(data_source_id=data_source.id).exists()
assert DataSourceDepartment.objects.filter(data_source_id=data_source.id).exists()
20 changes: 20 additions & 0 deletions src/bk-user/tests/apps/sync/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,16 @@
specific language governing permissions and limitations under the License.
"""

import base64
import io

import pytest
from bkuser.apps.sync.constants import SyncTaskStatus, SyncTaskTrigger
from bkuser.apps.sync.models import DataSourceSyncTask, TenantSyncTask
from django.conf import settings
from django.utils import timezone
from openpyxl.reader.excel import load_workbook
from openpyxl.workbook import Workbook


@pytest.fixture()
Expand Down Expand Up @@ -42,3 +48,17 @@ def tenant_sync_task(full_local_data_source, default_tenant, data_source_sync_ta
start_at=timezone.now(),
extras={"async_run": False},
)


@pytest.fixture()
def user_workbook() -> Workbook:
return load_workbook(settings.BASE_DIR / "tests/assets/fake_users.xlsx")


@pytest.fixture()
def encoded_file(user_workbook) -> str:
with io.BytesIO() as buffer:
user_workbook.save(buffer)
content = buffer.getvalue()

return base64.b64encode(content).decode("utf-8")
43 changes: 43 additions & 0 deletions src/bk-user/tests/apps/sync/test_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-用户管理(Bk-User) available.
Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""

import pytest
from bkuser.apps.sync.constants import SyncTaskStatus
from bkuser.apps.sync.models import DataSourceSyncTask
from bkuser.apps.sync.tasks import sync_data_source
from bkuser.common.cache import Cache, CacheEnum, CacheKeyPrefixEnum

pytestmark = pytest.mark.django_db


class TestSyncDataSourceTask:
rolin999 marked this conversation as resolved.
Show resolved Hide resolved
def test_success(self, data_source_sync_task, encoded_file):
cache = Cache(CacheEnum.REDIS, CacheKeyPrefixEnum.TEMPORARY_STORAGE)
rolin999 marked this conversation as resolved.
Show resolved Hide resolved
task_id = data_source_sync_task.id
task_key = "test_key"

cache.set(task_key, encoded_file)
plugin_init_extra_kwargs = {"task_key": task_key}
sync_data_source(task_id, plugin_init_extra_kwargs)

task = DataSourceSyncTask.objects.get(id=task_id)
rolin999 marked this conversation as resolved.
Show resolved Hide resolved
assert task.status == SyncTaskStatus.SUCCESS

def test_file_not_found(self, data_source_sync_task):
task_id = data_source_sync_task.id
task_key = "non_existing_key"

plugin_init_extra_kwargs = {"task_key": task_key}
sync_data_source(task_id, plugin_init_extra_kwargs)

task = DataSourceSyncTask.objects.get(id=task_id)
assert task.status == SyncTaskStatus.FAILED
assert f"data source sync task {task_id} require raw data in cache" in task.logs