Skip to content

Commit e60f78a

Browse files
(feat:storage) file uploads
1 parent 637d3a2 commit e60f78a

File tree

2 files changed

+133
-100
lines changed

2 files changed

+133
-100
lines changed

application/api/user/routes.py

+54-50
Original file line numberDiff line numberDiff line change
@@ -413,81 +413,85 @@ def post(self):
413413

414414
user = secure_filename(decoded_token.get("sub"))
415415
job_name = secure_filename(request.form["name"])
416+
416417
try:
417-
save_dir = os.path.join(current_dir, settings.UPLOAD_FOLDER, user, job_name)
418-
os.makedirs(save_dir, exist_ok=True)
419-
418+
from application.storage.storage_creator import StorageCreator
419+
storage = StorageCreator.get_storage()
420+
421+
base_path = f"{settings.UPLOAD_FOLDER}/{user}/{job_name}"
422+
420423
if len(files) > 1:
421-
temp_dir = os.path.join(save_dir, "temp")
422-
os.makedirs(temp_dir, exist_ok=True)
423-
424+
temp_files = []
424425
for file in files:
425426
filename = secure_filename(file.filename)
426-
file.save(os.path.join(temp_dir, filename))
427+
temp_path = f"{base_path}/temp/{filename}"
428+
storage.save_file(file, temp_path)
429+
temp_files.append(temp_path)
427430
print(f"Saved file: {filename}")
428-
zip_path = shutil.make_archive(
429-
base_name=os.path.join(save_dir, job_name),
430-
format="zip",
431-
root_dir=temp_dir,
432-
)
433-
final_filename = os.path.basename(zip_path)
434-
shutil.rmtree(temp_dir)
431+
432+
zip_filename = f"{job_name}.zip"
433+
zip_path = f"{base_path}/{zip_filename}"
434+
435+
def create_zip_archive(temp_paths, **kwargs):
436+
import tempfile
437+
with tempfile.TemporaryDirectory() as temp_dir:
438+
for path in temp_paths:
439+
file_data = storage.get_file(path)
440+
with open(os.path.join(temp_dir, os.path.basename(path)), 'wb') as f:
441+
f.write(file_data.read())
442+
443+
# Create zip archive
444+
zip_temp = shutil.make_archive(
445+
base_name=os.path.join(temp_dir, job_name),
446+
format="zip",
447+
root_dir=temp_dir
448+
)
449+
450+
return zip_temp
451+
452+
zip_temp_path = create_zip_archive(temp_files)
453+
with open(zip_temp_path, 'rb') as zip_file:
454+
storage.save_file(zip_file, zip_path)
455+
456+
# Clean up temp files
457+
for temp_path in temp_files:
458+
storage.delete_file(temp_path)
459+
435460
task = ingest.delay(
436461
settings.UPLOAD_FOLDER,
437462
[
438-
".rst",
439-
".md",
440-
".pdf",
441-
".txt",
442-
".docx",
443-
".csv",
444-
".epub",
445-
".html",
446-
".mdx",
447-
".json",
448-
".xlsx",
449-
".pptx",
450-
".png",
451-
".jpg",
452-
".jpeg",
463+
".rst", ".md", ".pdf", ".txt", ".docx", ".csv", ".epub",
464+
".html", ".mdx", ".json", ".xlsx", ".pptx", ".png",
465+
".jpg", ".jpeg",
453466
],
454467
job_name,
455-
final_filename,
468+
zip_filename,
456469
user,
457470
)
458471
else:
472+
# For single file
459473
file = files[0]
460-
final_filename = secure_filename(file.filename)
461-
file_path = os.path.join(save_dir, final_filename)
462-
file.save(file_path)
463-
474+
filename = secure_filename(file.filename)
475+
file_path = f"{base_path}/{filename}"
476+
477+
storage.save_file(file, file_path)
478+
464479
task = ingest.delay(
465480
settings.UPLOAD_FOLDER,
466481
[
467-
".rst",
468-
".md",
469-
".pdf",
470-
".txt",
471-
".docx",
472-
".csv",
473-
".epub",
474-
".html",
475-
".mdx",
476-
".json",
477-
".xlsx",
478-
".pptx",
479-
".png",
480-
".jpg",
481-
".jpeg",
482+
".rst", ".md", ".pdf", ".txt", ".docx", ".csv", ".epub",
483+
".html", ".mdx", ".json", ".xlsx", ".pptx", ".png",
484+
".jpg", ".jpeg",
482485
],
483486
job_name,
484-
final_filename,
487+
filename,
485488
user,
486489
)
487490

488491
except Exception as err:
489492
current_app.logger.error(f"Error uploading file: {err}")
490493
return make_response(jsonify({"success": False}), 400)
494+
491495
return make_response(jsonify({"success": True, "task_id": task.id}), 200)
492496

493497

application/worker.py

+79-50
Original file line numberDiff line numberDiff line change
@@ -133,62 +133,91 @@ def ingest_worker(
133133
limit = None
134134
exclude = True
135135
sample = False
136+
137+
storage = StorageCreator.get_storage()
138+
136139
full_path = os.path.join(directory, user, name_job)
137-
140+
source_file_path = os.path.join(full_path, filename)
141+
138142
logging.info(f"Ingest file: {full_path}", extra={"user": user, "job": name_job})
139-
file_data = {"name": name_job, "file": filename, "user": user}
143+
144+
# Create temporary working directory
145+
with tempfile.TemporaryDirectory() as temp_dir:
146+
try:
147+
os.makedirs(temp_dir, exist_ok=True)
148+
149+
# Download file from storage to temp directory
150+
temp_file_path = os.path.join(temp_dir, filename)
151+
file_data = storage.get_file(source_file_path)
152+
153+
with open(temp_file_path, 'wb') as f:
154+
f.write(file_data.read())
155+
156+
self.update_state(state="PROGRESS", meta={"current": 1})
157+
158+
# Handle zip files
159+
if filename.endswith('.zip'):
160+
logging.info(f"Extracting zip file: {filename}")
161+
extract_zip_recursive(
162+
temp_file_path,
163+
temp_dir,
164+
current_depth=0,
165+
max_depth=RECURSION_DEPTH
166+
)
167+
168+
if sample:
169+
logging.info(f"Sample mode enabled. Using {limit} documents.")
140170

141-
if not os.path.exists(full_path):
142-
os.makedirs(full_path)
143-
download_file(urljoin(settings.API_URL, "/api/download"), file_data, os.path.join(full_path, filename))
171+
reader = SimpleDirectoryReader(
172+
input_dir=temp_dir,
173+
input_files=input_files,
174+
recursive=recursive,
175+
required_exts=formats,
176+
exclude_hidden=exclude,
177+
file_metadata=metadata_from_filename,
178+
)
179+
raw_docs = reader.load_data()
144180

145-
# check if file is .zip and extract it
146-
if filename.endswith(".zip"):
147-
extract_zip_recursive(
148-
os.path.join(full_path, filename), full_path, 0, RECURSION_DEPTH
149-
)
181+
chunker = Chunker(
182+
chunking_strategy="classic_chunk",
183+
max_tokens=MAX_TOKENS,
184+
min_tokens=MIN_TOKENS,
185+
duplicate_headers=False
186+
)
187+
raw_docs = chunker.chunk(documents=raw_docs)
188+
189+
docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs]
190+
191+
id = ObjectId()
192+
193+
vector_store_path = os.path.join(temp_dir, 'vector_store')
194+
os.makedirs(vector_store_path, exist_ok=True)
195+
196+
embed_and_store_documents(docs, vector_store_path, id, self)
197+
198+
tokens = count_tokens_docs(docs)
199+
200+
self.update_state(state="PROGRESS", meta={"current": 100})
201+
202+
if sample:
203+
for i in range(min(5, len(raw_docs))):
204+
logging.info(f"Sample document {i}: {raw_docs[i]}")
205+
file_data = {
206+
"name": name_job,
207+
"file": filename,
208+
"user": user,
209+
"tokens": tokens,
210+
"retriever": retriever,
211+
"id": str(id),
212+
"type": "local",
213+
}
150214

151-
self.update_state(state="PROGRESS", meta={"current": 1})
152215

153-
raw_docs = SimpleDirectoryReader(
154-
input_dir=full_path,
155-
input_files=input_files,
156-
recursive=recursive,
157-
required_exts=formats,
158-
num_files_limit=limit,
159-
exclude_hidden=exclude,
160-
file_metadata=metadata_from_filename,
161-
).load_data()
162-
163-
chunker = Chunker(
164-
chunking_strategy="classic_chunk",
165-
max_tokens=MAX_TOKENS,
166-
min_tokens=MIN_TOKENS,
167-
duplicate_headers=False
168-
)
169-
raw_docs = chunker.chunk(documents=raw_docs)
170-
171-
docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs]
172-
id = ObjectId()
173-
174-
embed_and_store_documents(docs, full_path, id, self)
175-
tokens = count_tokens_docs(docs)
176-
self.update_state(state="PROGRESS", meta={"current": 100})
177-
178-
if sample:
179-
for i in range(min(5, len(raw_docs))):
180-
logging.info(f"Sample document {i}: {raw_docs[i]}")
181-
182-
file_data.update({
183-
"tokens": tokens,
184-
"retriever": retriever,
185-
"id": str(id),
186-
"type": "local",
187-
})
188-
upload_index(full_path, file_data)
189-
190-
# delete local
191-
shutil.rmtree(full_path)
216+
upload_index(vector_store_path, file_data)
217+
218+
except Exception as e:
219+
logging.error(f"Error in ingest_worker: {e}", exc_info=True)
220+
raise
192221

193222
return {
194223
"directory": directory,

0 commit comments

Comments
 (0)