Skip to content

Commit

Permalink
Added support for pyspark in dataprep microservice (opea-project#180)
Browse files Browse the repository at this point in the history
Signed-off-by: Xinyu Ye <xinyu.ye@intel.com>
  • Loading branch information
XinyuYe-Intel authored Jun 26, 2024
1 parent 8b6486b commit a5eb143
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 1 deletion.
2 changes: 2 additions & 0 deletions comps/dataprep/redis/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ We organized these two folders in the same way, so you can use either framework
- option 1: Install Single-process version (for 1-10 files processing)

```bash
apt update
apt install default-jre
# for langchain
cd langchain
# for llama_index
Expand Down
1 change: 1 addition & 0 deletions comps/dataprep/redis/langchain/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ RUN apt-get update -y && apt-get install -y --no-install-recommends --fix-missin
build-essential \
libgl1-mesa-glx \
libjemalloc-dev \
default-jre \
vim

RUN useradd -m -s /bin/bash user && \
Expand Down
26 changes: 25 additions & 1 deletion comps/dataprep/redis/langchain/prepare_doc_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from langchain_community.vectorstores import Redis
from langchain_text_splitters import HTMLHeaderTextSplitter
from langsmith import traceable
from pyspark import SparkConf, SparkContext

from comps import DocPath, opea_microservices, register_microservice
from comps.dataprep.utils import document_loader, parse_html
Expand Down Expand Up @@ -128,11 +129,34 @@ async def ingest_documents(
upload_folder = "./uploaded_files/"
if not os.path.exists(upload_folder):
Path(upload_folder).mkdir(parents=True, exist_ok=True)
uploaded_files = []
for file in files:
save_path = upload_folder + file.filename
await save_file_to_local_disk(save_path, file)
ingest_data_to_redis(DocPath(path=save_path, chunk_size=chunk_size, chunk_overlap=chunk_overlap))
uploaded_files.append(save_path)
print(f"Successfully saved file {save_path}")

def process_files_wrapper(files):
if not isinstance(files, list):
files = [files]
for file in files:
ingest_data_to_redis(DocPath(path=file, chunk_size=chunk_size, chunk_overlap=chunk_overlap))

try:
# Create a SparkContext
conf = SparkConf().setAppName("Parallel-dataprep").setMaster("local[*]")
sc = SparkContext(conf=conf)
# Create an RDD with parallel processing
parallel_num = min(len(uploaded_files), os.cpu_count())
rdd = sc.parallelize(uploaded_files, parallel_num)
# Perform a parallel operation
rdd_trans = rdd.map(process_files_wrapper)
rdd_trans.collect()
# Stop the SparkContext
sc.stop()
except:
# Stop the SparkContext
sc.stop()
return {"status": 200, "message": "Data preparation succeeded"}

if link_list:
Expand Down
3 changes: 3 additions & 0 deletions comps/dataprep/redis/langchain/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ langchain
langchain-community
langchain-text-splitters
langsmith
markdown
numpy
opentelemetry-api
opentelemetry-exporter-otlp
Expand All @@ -15,7 +16,9 @@ pandas
Pillow
prometheus-fastapi-instrumentator
pymupdf
pyspark
python-docx
redis
sentence_transformers
shortuuid
unstructured

0 comments on commit a5eb143

Please sign in to comment.