Skip to content

Commit

Permalink
add imap
Browse files Browse the repository at this point in the history
  • Loading branch information
CodingWithTim committed Nov 11, 2024
1 parent e0cd21e commit e790f0b
Showing 1 changed file with 18 additions and 5 deletions.
23 changes: 18 additions & 5 deletions fastchat/serve/monitor/clean_chat_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from functools import partial
from math import ceil
from datetime import datetime, timedelta
from tqdm import tqdm
import time
import multiprocessing as mp

Expand Down Expand Up @@ -139,8 +140,14 @@ def clean_chat_data(log_files, action_type, num_parallel):
with mp.Pool(num_parallel) as pool:
# Use partial to pass action_type to get_action_type_data
func = partial(get_action_type_data, action_type=action_type)
file_data = pool.map(
func, log_files, chunksize=ceil(len(log_files) / len(pool._pool))
file_data = list(
tqdm(
pool.imap(
func, log_files, chunksize=ceil(len(log_files) / len(pool._pool))
),
total=len(log_files),
desc="Processing Log Files",
)
)
# filter out Nones as some files may not contain any data belong to action_type
raw_data = []
Expand All @@ -151,8 +158,14 @@ def clean_chat_data(log_files, action_type, num_parallel):
# Use the multiprocessing Pool
with mp.Pool(num_parallel) as pool:
func = partial(process_data, action_type=action_type)
results = pool.map(
func, raw_data, chunksize=ceil(len(raw_data) / len(pool._pool))
results = list(
tqdm(
pool.imap(
func, raw_data, chunksize=ceil(len(raw_data) / len(pool._pool))
),
total=len(raw_data),
desc="Processing Raw Data",
)
)

# Aggregate results from child processes
Expand All @@ -161,7 +174,7 @@ def clean_chat_data(log_files, action_type, num_parallel):
ct_network_error = 0
all_models = set()
chats = []
for data in results:
for data in tqdm(results):
if "ct_invalid_conv_id" in data:
ct_invalid_conv_id += data["ct_invalid_conv_id"]
continue
Expand Down

0 comments on commit e790f0b

Please sign in to comment.