Skip to content

Commit

Permalink
chore: optimize asynchronous deletion performance of app related data (
Browse files Browse the repository at this point in the history
  • Loading branch information
takatost authored Jul 24, 2024
1 parent c112188 commit 05141ed
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 109 deletions.
1 change: 0 additions & 1 deletion .github/workflows/api-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,5 @@ jobs:
pgvecto-rs
pgvector
chroma
myscale
- name: Test Vector Stores
run: poetry run -C api bash dev/pytest/pytest_vdb.sh
6 changes: 5 additions & 1 deletion api/services/app_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,12 @@ def delete_app(self, app: App) -> None:
"""
db.session.delete(app)
db.session.commit()

# Trigger asynchronous deletion of app and related data
remove_app_and_related_data_task.delay(app.id)
remove_app_and_related_data_task.delay(
tenant_id=app.tenant_id,
app_id=app.id
)

def get_app_meta(self, app_model: App) -> dict:
"""
Expand Down
342 changes: 236 additions & 106 deletions api/tasks/remove_app_and_related_data_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import click
from celery import shared_task
from sqlalchemy import select
from sqlalchemy.exc import SQLAlchemyError

from extensions.ext_database import db
Expand All @@ -25,129 +24,260 @@
RecommendedApp,
Site,
TagBinding,
TraceAppConfig,
)
from models.tools import WorkflowToolProvider
from models.web import PinnedConversation, SavedMessage
from models.workflow import Workflow, WorkflowAppLog, WorkflowNodeExecution, WorkflowRun


@shared_task(queue='app_deletion', bind=True, max_retries=3)
def remove_app_and_related_data_task(self, app_id: str):
logging.info(click.style(f'Start deleting app and related data: {app_id}', fg='green'))
def remove_app_and_related_data_task(self, tenant_id: str, app_id: str):
logging.info(click.style(f'Start deleting app and related data: {tenant_id}:{app_id}', fg='green'))
start_at = time.perf_counter()
try:
# Use a transaction to ensure all deletions succeed or none do
with db.session.begin_nested():
# Delete related data
_delete_app_model_configs(app_id)
_delete_app_site(app_id)
_delete_app_api_tokens(app_id)
_delete_installed_apps(app_id)
_delete_recommended_apps(app_id)
_delete_app_annotation_data(app_id)
_delete_app_dataset_joins(app_id)
_delete_app_workflows(app_id)
_delete_app_conversations(app_id)
_delete_app_messages(app_id)
_delete_workflow_tool_providers(app_id)
_delete_app_tag_bindings(app_id)
_delete_end_users(app_id)

# If we reach here, the transaction was successful
db.session.commit()
# Delete related data
_delete_app_model_configs(tenant_id, app_id)
_delete_app_site(tenant_id, app_id)
_delete_app_api_tokens(tenant_id, app_id)
_delete_installed_apps(tenant_id, app_id)
_delete_recommended_apps(tenant_id, app_id)
_delete_app_annotation_data(tenant_id, app_id)
_delete_app_dataset_joins(tenant_id, app_id)
_delete_app_workflows(tenant_id, app_id)
_delete_app_conversations(tenant_id, app_id)
_delete_app_messages(tenant_id, app_id)
_delete_workflow_tool_providers(tenant_id, app_id)
_delete_app_tag_bindings(tenant_id, app_id)
_delete_end_users(tenant_id, app_id)
_delete_trace_app_configs(tenant_id, app_id)

end_at = time.perf_counter()
logging.info(click.style(f'App and related data deleted: {app_id} latency: {end_at - start_at}', fg='green'))

except SQLAlchemyError as e:
db.session.rollback()
logging.exception(
click.style(f"Database error occurred while deleting app {app_id} and related data", fg='red'))
raise self.retry(exc=e, countdown=60) # Retry after 60 seconds

except Exception as e:
logging.exception(click.style(f"Error occurred while deleting app {app_id} and related data", fg='red'))
raise self.retry(exc=e, countdown=60) # Retry after 60 seconds


def _delete_app_model_configs(app_id: str):
db.session.query(AppModelConfig).filter(AppModelConfig.app_id == app_id).delete()


def _delete_app_site(app_id: str):
db.session.query(Site).filter(Site.app_id == app_id).delete()


def _delete_app_api_tokens(app_id: str):
db.session.query(ApiToken).filter(ApiToken.app_id == app_id).delete()


def _delete_installed_apps(app_id: str):
db.session.query(InstalledApp).filter(InstalledApp.app_id == app_id).delete()


def _delete_recommended_apps(app_id: str):
db.session.query(RecommendedApp).filter(RecommendedApp.app_id == app_id).delete()


def _delete_app_annotation_data(app_id: str):
db.session.query(AppAnnotationHitHistory).filter(AppAnnotationHitHistory.app_id == app_id).delete()
db.session.query(AppAnnotationSetting).filter(AppAnnotationSetting.app_id == app_id).delete()


def _delete_app_dataset_joins(app_id: str):
db.session.query(AppDatasetJoin).filter(AppDatasetJoin.app_id == app_id).delete()


def _delete_app_workflows(app_id: str):
db.session.query(WorkflowRun).filter(
WorkflowRun.workflow_id.in_(
db.session.query(Workflow.id).filter(Workflow.app_id == app_id)
)
).delete(synchronize_session=False)
db.session.query(WorkflowNodeExecution).filter(
WorkflowNodeExecution.workflow_id.in_(
db.session.query(Workflow.id).filter(Workflow.app_id == app_id)
)
).delete(synchronize_session=False)
db.session.query(WorkflowAppLog).filter(WorkflowAppLog.app_id == app_id).delete(synchronize_session=False)
db.session.query(Workflow).filter(Workflow.app_id == app_id).delete(synchronize_session=False)


def _delete_app_conversations(app_id: str):
db.session.query(PinnedConversation).filter(
PinnedConversation.conversation_id.in_(
db.session.query(Conversation.id).filter(Conversation.app_id == app_id)
)
).delete(synchronize_session=False)
db.session.query(Conversation).filter(Conversation.app_id == app_id).delete()


def _delete_app_messages(app_id: str):
message_ids = select(Message.id).filter(Message.app_id == app_id).scalar_subquery()
db.session.query(MessageFeedback).filter(MessageFeedback.message_id.in_(message_ids)).delete(
synchronize_session=False)
db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id.in_(message_ids)).delete(
synchronize_session=False)
db.session.query(MessageChain).filter(MessageChain.message_id.in_(message_ids)).delete(synchronize_session=False)
db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id.in_(message_ids)).delete(
synchronize_session=False)
db.session.query(MessageFile).filter(MessageFile.message_id.in_(message_ids)).delete(synchronize_session=False)
db.session.query(SavedMessage).filter(SavedMessage.message_id.in_(message_ids)).delete(synchronize_session=False)
db.session.query(Message).filter(Message.app_id == app_id).delete(synchronize_session=False)


def _delete_workflow_tool_providers(app_id: str):
db.session.query(WorkflowToolProvider).filter(
WorkflowToolProvider.app_id == app_id
).delete(synchronize_session=False)


def _delete_app_tag_bindings(app_id: str):
db.session.query(TagBinding).filter(
TagBinding.target_id == app_id
).delete(synchronize_session=False)


def _delete_end_users(app_id: str):
db.session.query(EndUser).filter(EndUser.app_id == app_id).delete()
def _delete_app_model_configs(tenant_id: str, app_id: str):
def del_model_config(model_config_id: str):
db.session.query(AppModelConfig).filter(AppModelConfig.id == model_config_id).delete(synchronize_session=False)

_delete_records(
"""select id from app_model_configs where app_id=:app_id limit 1000""",
{"app_id": app_id},
del_model_config,
"app model config"
)


def _delete_app_site(tenant_id: str, app_id: str):
def del_site(site_id: str):
db.session.query(Site).filter(Site.id == site_id).delete(synchronize_session=False)

_delete_records(
"""select id from sites where app_id=:app_id limit 1000""",
{"app_id": app_id},
del_site,
"site"
)


def _delete_app_api_tokens(tenant_id: str, app_id: str):
def del_api_token(api_token_id: str):
db.session.query(ApiToken).filter(ApiToken.id == api_token_id).delete(synchronize_session=False)

_delete_records(
"""select id from api_tokens where app_id=:app_id limit 1000""",
{"app_id": app_id},
del_api_token,
"api token"
)


def _delete_installed_apps(tenant_id: str, app_id: str):
def del_installed_app(installed_app_id: str):
db.session.query(InstalledApp).filter(InstalledApp.id == installed_app_id).delete(synchronize_session=False)

_delete_records(
"""select id from installed_apps where tenant_id=:tenant_id and app_id=:app_id limit 1000""",
{"tenant_id": tenant_id, "app_id": app_id},
del_installed_app,
"installed app"
)


def _delete_recommended_apps(tenant_id: str, app_id: str):
def del_recommended_app(recommended_app_id: str):
db.session.query(RecommendedApp).filter(RecommendedApp.id == recommended_app_id).delete(
synchronize_session=False)

_delete_records(
"""select id from recommended_apps where app_id=:app_id limit 1000""",
{"app_id": app_id},
del_recommended_app,
"recommended app"
)


def _delete_app_annotation_data(tenant_id: str, app_id: str):
def del_annotation_hit_history(annotation_hit_history_id: str):
db.session.query(AppAnnotationHitHistory).filter(
AppAnnotationHitHistory.id == annotation_hit_history_id).delete(synchronize_session=False)

_delete_records(
"""select id from app_annotation_hit_histories where app_id=:app_id limit 1000""",
{"app_id": app_id},
del_annotation_hit_history,
"annotation hit history"
)

def del_annotation_setting(annotation_setting_id: str):
db.session.query(AppAnnotationSetting).filter(AppAnnotationSetting.id == annotation_setting_id).delete(
synchronize_session=False)

_delete_records(
"""select id from app_annotation_settings where app_id=:app_id limit 1000""",
{"app_id": app_id},
del_annotation_setting,
"annotation setting"
)


def _delete_app_dataset_joins(tenant_id: str, app_id: str):
def del_dataset_join(dataset_join_id: str):
db.session.query(AppDatasetJoin).filter(AppDatasetJoin.id == dataset_join_id).delete(synchronize_session=False)

_delete_records(
"""select id from app_dataset_joins where app_id=:app_id limit 1000""",
{"app_id": app_id},
del_dataset_join,
"dataset join"
)


def _delete_app_workflows(tenant_id: str, app_id: str):
def del_workflow(workflow_id: str):
db.session.query(WorkflowRun).filter(WorkflowRun.workflow_id == workflow_id).delete(synchronize_session=False)
db.session.query(WorkflowNodeExecution).filter(WorkflowNodeExecution.workflow_id == workflow_id).delete(
synchronize_session=False)
db.session.query(WorkflowAppLog).filter(WorkflowAppLog.workflow_id == workflow_id).delete(
synchronize_session=False)
db.session.query(Workflow).filter(Workflow.id == workflow_id).delete(synchronize_session=False)

_delete_records(
"""select id from workflows where tenant_id=:tenant_id and app_id=:app_id limit 1000""",
{"tenant_id": tenant_id, "app_id": app_id},
del_workflow,
"workflow"
)


def _delete_app_conversations(tenant_id: str, app_id: str):
def del_conversation(conversation_id: str):
db.session.query(PinnedConversation).filter(PinnedConversation.conversation_id == conversation_id).delete(
synchronize_session=False)
db.session.query(Conversation).filter(Conversation.id == conversation_id).delete(synchronize_session=False)

_delete_records(
"""select id from conversations where app_id=:app_id limit 1000""",
{"app_id": app_id},
del_conversation,
"conversation"
)


def _delete_app_messages(tenant_id: str, app_id: str):
def del_message(message_id: str):
db.session.query(MessageFeedback).filter(MessageFeedback.message_id == message_id).delete(
synchronize_session=False)
db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id == message_id).delete(
synchronize_session=False)
db.session.query(MessageChain).filter(MessageChain.message_id == message_id).delete(
synchronize_session=False)
db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id == message_id).delete(
synchronize_session=False)
db.session.query(MessageFile).filter(MessageFile.message_id == message_id).delete(synchronize_session=False)
db.session.query(SavedMessage).filter(SavedMessage.message_id == message_id).delete(
synchronize_session=False)
db.session.query(Message).filter(Message.id == message_id).delete()

_delete_records(
"""select id from messages where app_id=:app_id limit 1000""",
{"app_id": app_id},
del_message,
"message"
)


def _delete_workflow_tool_providers(tenant_id: str, app_id: str):
def del_tool_provider(tool_provider_id: str):
db.session.query(WorkflowToolProvider).filter(WorkflowToolProvider.id == tool_provider_id).delete(
synchronize_session=False)

_delete_records(
"""select id from tool_workflow_providers where tenant_id=:tenant_id and app_id=:app_id limit 1000""",
{"tenant_id": tenant_id, "app_id": app_id},
del_tool_provider,
"tool workflow provider"
)


def _delete_app_tag_bindings(tenant_id: str, app_id: str):
def del_tag_binding(tag_binding_id: str):
db.session.query(TagBinding).filter(TagBinding.id == tag_binding_id).delete(synchronize_session=False)

_delete_records(
"""select id from tag_bindings where tenant_id=:tenant_id and target_id=:app_id limit 1000""",
{"tenant_id": tenant_id, "app_id": app_id},
del_tag_binding,
"tag binding"
)


def _delete_end_users(tenant_id: str, app_id: str):
def del_end_user(end_user_id: str):
db.session.query(EndUser).filter(EndUser.id == end_user_id).delete(synchronize_session=False)

_delete_records(
"""select id from end_users where tenant_id=:tenant_id and app_id=:app_id limit 1000""",
{"tenant_id": tenant_id, "app_id": app_id},
del_end_user,
"end user"
)


def _delete_trace_app_configs(tenant_id: str, app_id: str):
def del_trace_app_config(trace_app_config_id: str):
db.session.query(TraceAppConfig).filter(TraceAppConfig.id == trace_app_config_id).delete(
synchronize_session=False)

_delete_records(
"""select id from trace_app_config where app_id=:app_id limit 1000""",
{"app_id": app_id},
del_trace_app_config,
"trace app config"
)


def _delete_records(query_sql: str, params: dict, delete_func: callable, name: str) -> None:
while True:
with db.engine.begin() as conn:
rs = conn.execute(db.text(query_sql), params)
if rs.rowcount == 0:
break

for i in rs:
record_id = str(i.id)
try:
delete_func(record_id)
db.session.commit()
logging.info(click.style(f"Deleted {name} {record_id}", fg='green'))
except Exception:
logging.exception(f"Error occurred while deleting {name} {record_id}")
continue
rs.close()
Loading

0 comments on commit 05141ed

Please sign in to comment.