Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added async operation to import data sources #1915

Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
ad42eb5
feat: added async operation to import data sources
rolin999 Sep 2, 2024
46abd74
feat: added async operation to import data sources
rolin999 Sep 3, 2024
9e299ef
feat: added async operation to import data sources
rolin999 Sep 3, 2024
5009422
resolve: fixed code style and test case
rolin999 Sep 3, 2024
2388f66
Merge branch 'main' into add-async-operation-for-import-data-sources
rolin999 Sep 3, 2024
c5a34d7
resolve: added test case
rolin999 Sep 3, 2024
f97ab21
resolve: fix code style and test case
rolin999 Sep 5, 2024
de9e329
resolve: added abstract class for temporary storage
rolin999 Sep 5, 2024
bc71e11
resolve: added abstract for temporary storage
rolin999 Sep 6, 2024
de434d9
resolve: change code style
rolin999 Sep 9, 2024
1b55f36
resolve: adjust the code structure and test case
rolin999 Sep 11, 2024
9d818e7
resolve: change port in start.sh
rolin999 Sep 11, 2024
76139fc
resolve: added _ensure_only_basic_type_in_kwargs fucntion
rolin999 Sep 11, 2024
f8f2a9a
resolve: fix code style
rolin999 Sep 11, 2024
fc16211
resolve: fix code style
rolin999 Sep 11, 2024
4476989
resolve: fix code style
rolin999 Sep 11, 2024
e9d4fdc
resolve: fix code style
rolin999 Sep 11, 2024
55d9ded
resolve: fix test case
rolin999 Sep 12, 2024
ecbde0c
Merge branch 'main' into add-async-operation-for-import-data-sources
rolin999 Sep 12, 2024
ac261b3
resolve: fix code style and test case
rolin999 Sep 12, 2024
c8380c7
resolve: fix code style and test case
rolin999 Sep 12, 2024
ae7b06c
resolve: fix code style and test case
rolin999 Sep 12, 2024
1ca267d
resolve: fix code style and test case
rolin999 Sep 18, 2024
d451c77
resolve: fix code style and test case
rolin999 Sep 20, 2024
d377700
resolve: fix code style
rolin999 Sep 20, 2024
9d37d5d
resolve: fix code style
rolin999 Sep 20, 2024
5282a9a
resolve: fix code style
rolin999 Sep 23, 2024
4a96475
resolve: fix code style
rolin999 Sep 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/bk-user/bkuser/apis/web/data_source/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,8 +465,7 @@ def post(self, request, *args, **kwargs):
operator=request.user.username,
overwrite=data["overwrite"],
incremental=data["incremental"],
# FIXME (su) 本地数据源导入也要改成异步行为,但是要解决 excel 如何传递的问题
async_run=False,
async_run=True,
trigger=SyncTaskTrigger.MANUAL,
)

Expand Down
7 changes: 7 additions & 0 deletions src/bk-user/bkuser/apps/sync/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from bkuser.apps.sync.models import DataSourceSyncTask, TenantSyncTask
from bkuser.apps.sync.runners import DataSourceSyncTaskRunner, TenantSyncTaskRunner
from bkuser.apps.sync.tasks import sync_data_source, sync_tenant
from bkuser.apps.sync.workbook_temp_store import WorkbookTempStore


class DataSourceSyncManager:
Expand Down Expand Up @@ -49,6 +50,12 @@ def execute(self, plugin_init_extra_kwargs: Optional[Dict[str, Any]] = None) ->
)

if self.sync_options.async_run:
# 若数据源是本地数据源,则将 Workbook 文件存储到临时存储中
if self.data_source.is_local:
rolin999 marked this conversation as resolved.
Show resolved Hide resolved
storage = WorkbookTempStore()
temporary_storage_id = storage.save(plugin_init_extra_kwargs["workbook"])
plugin_init_extra_kwargs = {"temporary_storage_id": temporary_storage_id}

self._ensure_only_basic_type_in_kwargs(plugin_init_extra_kwargs)
sync_data_source.apply_async(args=[task.id, plugin_init_extra_kwargs], soft_time_limit=self.sync_timeout)
else:
Expand Down
17 changes: 17 additions & 0 deletions src/bk-user/bkuser/apps/sync/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""

import logging
from typing import Any, Dict

Expand All @@ -16,8 +17,10 @@
from bkuser.apps.data_source.models import DataSource
from bkuser.apps.notification.constants import NotificationScene
from bkuser.apps.notification.notifier import TenantUserNotifier
from bkuser.apps.sync.constants import SyncTaskStatus
from bkuser.apps.sync.models import DataSourceSyncTask, TenantSyncTask
from bkuser.apps.sync.runners import DataSourceSyncTaskRunner, TenantSyncTaskRunner
from bkuser.apps.sync.workbook_temp_store import WorkbookTempStore
from bkuser.apps.tenant.models import TenantUser
from bkuser.celery import app
from bkuser.common.task import BaseTask
Expand All @@ -30,6 +33,20 @@ def sync_data_source(task_id: int, plugin_init_extra_kwargs: Dict[str, Any]):
"""同步数据源数据"""
rolin999 marked this conversation as resolved.
Show resolved Hide resolved
logger.info("[celery] receive data source sync task: %s", task_id)
task = DataSourceSyncTask.objects.get(id=task_id)

if task.data_source.is_local and (temporary_storage_id := plugin_init_extra_kwargs.get("temporary_storage_id")):
# 若已指定临时存储的数据唯一标识,则需要从临时存储中获取数据
storage = WorkbookTempStore()
try:
workbook = storage.get_once(temporary_storage_id)
except ValueError:
task.status = SyncTaskStatus.FAILED
task.logs = f"data source sync task {task_id} require raw data in temporary storage, but not found"
task.save(update_fields=["status", "logs", "updated_at"])
return

plugin_init_extra_kwargs = {"workbook": workbook}

DataSourceSyncTaskRunner(task, plugin_init_extra_kwargs).run()
rolin999 marked this conversation as resolved.
Show resolved Hide resolved


Expand Down
68 changes: 68 additions & 0 deletions src/bk-user/bkuser/apps/sync/workbook_temp_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-用户管理(Bk-User) available.
Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""

import base64
import io

from openpyxl import load_workbook
from openpyxl.workbook import Workbook

from bkuser.common.cache import Cache, CacheEnum, CacheKeyPrefixEnum
from bkuser.utils.uuid import generate_uuid

# 临时存储数据的过期时间
TemporaryStorageDefaultTimeout = 10 * 60


class WorkbookTempStore:
"""导入 Workbook 时的临时存储"""

def __init__(self):
# 初始化 redis 临时存储,后续加入 bk-repo 等 backend
self.storage = Cache(CacheEnum.REDIS, CacheKeyPrefixEnum.WORKBOOK_TEMPORARY_STORE)

def save(self, workbook: Workbook, timeout: int = TemporaryStorageDefaultTimeout) -> str:
"""
将 Excel Workbook 保存到临时存储中,并返回临时存储的数据唯一标识
:param workbook: Excel Workbook
:param timeout: 过期时间
:return: 临时数据唯一标识
"""

# 将 Workbook 保存到 内存字节流,便于获取到字节内容
with io.BytesIO() as buffer:
workbook.save(buffer)
data = buffer.getvalue()

# 生成临时数据的唯一标识,用于后续查询
temporary_storage_id = generate_uuid()

encoded_data = base64.b64encode(data).decode("utf-8")
self.storage.set(temporary_storage_id, encoded_data, timeout)

return temporary_storage_id

def get_once(self, temporary_storage_id: str) -> Workbook:
rolin999 marked this conversation as resolved.
Show resolved Hide resolved
"""
从临时存储中获取临时数据并转换为 Excel Workbook, 获取成功后即删除该临时存储中的临时数据
:param temporary_storage_id: 临时数据唯一标识
:return: Excel Workbook
"""

encoded_data = self.storage.get(temporary_storage_id)
if not encoded_data:
raise ValueError(f"data(id={temporary_storage_id}) not found in temporary storage")

# 获取成功则删除,无需等待过期
self.storage.delete(temporary_storage_id)

data = base64.b64decode(encoded_data)
return load_workbook(filename=io.BytesIO(data))
3 changes: 3 additions & 0 deletions src/bk-user/bkuser/common/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""

import functools

from blue_krill.data_types.enum import EnumField, StructuredEnum
Expand Down Expand Up @@ -38,6 +39,8 @@ class CacheKeyPrefixEnum(str, StructuredEnum):
VERIFICATION_CODE = "vc"
# 用户重置密码用 Token
RESET_PASSWORD_TOKEN = "rpt"
# Workbook 临时存储
WORKBOOK_TEMPORARY_STORE = "wts"


def _default_key_function(*args, **kwargs):
Expand Down
27 changes: 27 additions & 0 deletions src/bk-user/tests/apis/web/data_source/test_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@
from bkuser.apps.data_source.models import DataSource, DataSourceDepartment, DataSourceSensitiveInfo, DataSourceUser
from bkuser.apps.idp.constants import INVALID_REAL_DATA_SOURCE_ID, IdpStatus
from bkuser.apps.idp.models import Idp, IdpSensitiveInfo
from bkuser.apps.sync.constants import SyncTaskStatus
from bkuser.apps.sync.models import DataSourceSyncTask
from bkuser.plugins.constants import DataSourcePluginEnum
from bkuser.plugins.local.constants import PasswordGenerateMethod
from django.conf import settings
from django.core.files.uploadedfile import SimpleUploadedFile
from django.test.utils import override_settings
from django.urls import reverse
from rest_framework import status

Expand Down Expand Up @@ -441,3 +446,25 @@ def test_retrieve_other_tenant_data_source_sync_record(self, api_client, data_so
other_tenant_task = data_source_sync_tasks[2]
resp = api_client.get(reverse("data_source.sync_record.retrieve", kwargs={"id": other_tenant_task.id}))
assert resp.status_code == status.HTTP_404_NOT_FOUND


class TestDataSourceImportApi:
rolin999 marked this conversation as resolved.
Show resolved Hide resolved
def test_data_source_import_success(self, api_client, data_source):
with override_settings(CELERY_TASK_ALWAYS_EAGER=True, CELERY_TASK_EAGER_PROPAGATES=True):
with open(settings.BASE_DIR / "tests/assets/fake_users.xlsx", "rb") as excel_file:
uploaded_file = SimpleUploadedFile(
name="fake_users.xlsx",
content=excel_file.read(),
content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
)
resp = api_client.post(
reverse("data_source.import_from_excel", kwargs={"id": data_source.id}),
data={"overwrite": False, "incremental": True, "file": uploaded_file},
format="multipart",
)
sync_task = DataSourceSyncTask.objects.get(data_source=data_source)

assert resp.status_code == status.HTTP_200_OK
assert sync_task.status == SyncTaskStatus.SUCCESS
assert DataSourceUser.objects.filter(data_source_id=data_source.id).exists()
assert DataSourceDepartment.objects.filter(data_source_id=data_source.id).exists()
20 changes: 20 additions & 0 deletions src/bk-user/tests/apps/sync/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,16 @@
specific language governing permissions and limitations under the License.
"""

import base64
import io

import pytest
from bkuser.apps.sync.constants import SyncTaskStatus, SyncTaskTrigger
from bkuser.apps.sync.models import DataSourceSyncTask, TenantSyncTask
from django.conf import settings
from django.utils import timezone
from openpyxl.reader.excel import load_workbook
from openpyxl.workbook import Workbook


@pytest.fixture()
Expand Down Expand Up @@ -42,3 +48,17 @@ def tenant_sync_task(full_local_data_source, default_tenant, data_source_sync_ta
start_at=timezone.now(),
extras={"async_run": False},
)


@pytest.fixture()
def user_workbook() -> Workbook:
return load_workbook(settings.BASE_DIR / "tests/assets/fake_users.xlsx")


@pytest.fixture()
def encoded_file(user_workbook) -> str:
with io.BytesIO() as buffer:
user_workbook.save(buffer)
content = buffer.getvalue()

return base64.b64encode(content).decode("utf-8")
60 changes: 60 additions & 0 deletions src/bk-user/tests/apps/sync/test_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-用户管理(Bk-User) available.
Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""

import pytest
from bkuser.apps.sync.constants import SyncTaskStatus
from bkuser.apps.sync.tasks import sync_data_source
from bkuser.apps.sync.workbook_temp_store import WorkbookTempStore

pytestmark = pytest.mark.django_db


class TestSyncDataSource:
rolin999 marked this conversation as resolved.
Show resolved Hide resolved
def test_success(self, data_source_sync_task, user_workbook):
task_id = data_source_sync_task.id
storage = WorkbookTempStore()
temporary_storage_id = storage.save(user_workbook)

plugin_init_extra_kwargs = {"temporary_storage_id": temporary_storage_id}
sync_data_source(task_id, plugin_init_extra_kwargs)

data_source_sync_task.refresh_from_db()
assert data_source_sync_task.status == SyncTaskStatus.SUCCESS

def test_file_not_found(self, data_source_sync_task):
task_id = data_source_sync_task.id
temporary_storage_id = "non_existing_key"

plugin_init_extra_kwargs = {"temporary_storage_id": temporary_storage_id}
sync_data_source(task_id, plugin_init_extra_kwargs)

data_source_sync_task.refresh_from_db()
assert data_source_sync_task.status == SyncTaskStatus.FAILED
assert (
f"data source sync task {task_id} require raw data in temporary storage, but not found"
in data_source_sync_task.logs
)

def test_file_not_get(self, data_source_sync_task, user_workbook):
task_id = data_source_sync_task.id
storage = WorkbookTempStore()
temporary_storage_id = storage.save(user_workbook)
storage.get_once(temporary_storage_id)

plugin_init_extra_kwargs = {"temporary_storage_id": temporary_storage_id}
sync_data_source(task_id, plugin_init_extra_kwargs)

data_source_sync_task.refresh_from_db()
assert data_source_sync_task.status == SyncTaskStatus.FAILED
assert (
f"data source sync task {task_id} require raw data in temporary storage, but not found"
in data_source_sync_task.logs
)