Skip to content

Commit

Permalink
Implement Multithreading in Task Executor
Browse files Browse the repository at this point in the history
  • Loading branch information
cklogic committed May 9, 2024
1 parent 3b77213 commit d004c81
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 4 deletions.
3 changes: 2 additions & 1 deletion conf/service_conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ authentication:
permission:
switch: false
component: false
dataset: false
dataset: false
task_executor_threads: 4
2 changes: 2 additions & 0 deletions rag/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
pass
DOC_MAXIMUM_SIZE = 128 * 1024 * 1024

TASK_EXECUTOR_THREADS = get_base_config("task_executor_threads", 4)

# Logger
LoggerFactory.set_directory(
os.path.join(
Expand Down
13 changes: 10 additions & 3 deletions rag/svr/task_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
import copy
import re
import sys
import threading
import time
import traceback
from functools import partial

from api.db.services.file2document_service import File2DocumentService
from rag.utils.minio_conn import MINIO
from api.db.db_models import close_connection
from rag.settings import database_logger, SVR_QUEUE_NAME
from rag.settings import database_logger, SVR_QUEUE_NAME, TASK_EXECUTOR_THREADS
from rag.settings import cron_logger, DOC_MAXIMUM_SIZE
from multiprocessing import Pool
import numpy as np
Expand Down Expand Up @@ -304,12 +305,18 @@ def main():
r["id"], tk_count, len(cks), timer()-st))


def worker(thread_number):
cron_logger.info("Task worker run : {}".format(str(thread_number)))
while True:
main()


if __name__ == "__main__":
peewee_logger = logging.getLogger('peewee')
peewee_logger.propagate = False
peewee_logger.addHandler(database_logger.handlers[0])
peewee_logger.setLevel(database_logger.level)

while True:
main()
for i in range(TASK_EXECUTOR_THREADS):
t = threading.Thread(target=worker, args=(i,))
t.start()

0 comments on commit d004c81

Please sign in to comment.