Skip to content

Local file upload gcs #442

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/example.env
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ LANGCHAIN_API_KEY = ""
LANGCHAIN_PROJECT = ""
LANGCHAIN_TRACING_V2 = ""
LANGCHAIN_ENDPOINT = ""
GCS_FILE_CACHE = "" #save the file into GCS or local, SHould be True or False
6 changes: 3 additions & 3 deletions backend/score.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ async def extract_knowledge_graph_from_file(
merged_file_path = os.path.join(MERGED_DIR,file_name)
logging.info(f'File path:{merged_file_path}')
result = await asyncio.to_thread(
extract_graph_from_file_local_file, graph, model, file_name, merged_file_path, allowedNodes, allowedRelationship)
extract_graph_from_file_local_file, graph, model, merged_file_path, file_name, allowedNodes, allowedRelationship)

elif source_type == 's3 bucket' and source_url:
result = await asyncio.to_thread(
Expand Down Expand Up @@ -191,7 +191,7 @@ async def extract_knowledge_graph_from_file(
error_message = str(e)
graphDb_data_Access.update_exception_db(file_name,error_message)
if source_type == 'local file':
delete_uploaded_local_file(merged_file_path, file_name)
delete_file_from_gcs(BUCKET_UPLOAD,file_name)
josn_obj = {'message':message,'error_message':error_message, 'file_name': file_name,'status':'Failed','db_url':uri,'failed_count':1, 'source_type': source_type}
logger.log_struct(josn_obj)
logging.exception(f'File Failed in extraction: {josn_obj}')
Expand Down Expand Up @@ -342,7 +342,7 @@ async def upload_large_file_into_chunks(file:UploadFile = File(...), chunkNumber
password=Form(None), database=Form(None)):
try:
graph = create_graph_database_connection(uri, userName, password, database)
result = await asyncio.to_thread(upload_file, graph, model, file, chunkNumber, totalChunks, originalname, CHUNK_DIR, MERGED_DIR)
result = await asyncio.to_thread(upload_file, graph, model, file, chunkNumber, totalChunks, originalname, uri, CHUNK_DIR, MERGED_DIR)
josn_obj = {'api_name':'upload','db_url':uri}
logger.log_struct(josn_obj)
if int(chunkNumber) == int(totalChunks):
Expand Down
71 changes: 67 additions & 4 deletions backend/src/document_sources/gcs_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def get_gcs_bucket_files_info(gcs_project_id, gcs_bucket_name, gcs_bucket_folder
def load_pdf(file_path):
return PyMuPDFLoader(file_path)

def get_documents_from_gcs(gcs_project_id, gcs_bucket_name, gcs_bucket_folder, gcs_blob_filename, access_token):
def get_documents_from_gcs(gcs_project_id, gcs_bucket_name, gcs_bucket_folder, gcs_blob_filename, access_token=None):

if gcs_bucket_folder is not None:
if gcs_bucket_folder.endswith('/'):
Expand All @@ -56,8 +56,12 @@ def get_documents_from_gcs(gcs_project_id, gcs_bucket_name, gcs_bucket_folder, g
# pages = loader.load()
# file_name = gcs_blob_filename
#creds= Credentials(access_token)
creds= Credentials(access_token)
storage_client = storage.Client(project=gcs_project_id, credentials=creds)
if access_token is None:
storage_client = storage.Client(project=gcs_project_id)
else:
creds= Credentials(access_token)
storage_client = storage.Client(project=gcs_project_id, credentials=creds)
print(f'BLOB Name: {blob_name}')
bucket = storage_client.bucket(gcs_bucket_name)
blob = bucket.blob(blob_name)
content = blob.download_as_bytes()
Expand All @@ -70,4 +74,63 @@ def get_documents_from_gcs(gcs_project_id, gcs_bucket_name, gcs_bucket_folder, g
text += page.extract_text()
pages = [Document(page_content = text)]
return gcs_blob_filename, pages


def upload_file_to_gcs(file_chunk, chunk_number, original_file_name, bucket_name):
storage_client = storage.Client()

file_name = f'{original_file_name}_part_{chunk_number}'
bucket = storage_client.bucket(bucket_name)
file_data = file_chunk.file.read()
# print(f'data after read {file_data}')

blob = bucket.blob(file_name)
file_io = io.BytesIO(file_data)
blob.upload_from_file(file_io)
# Define the lifecycle rule to delete objects after 6 hours
rule = {
"action": {"type": "Delete"},
"condition": {"age": 1} # Age in days (24 hours = 1 days)
}

# Get the current lifecycle policy
lifecycle = list(bucket.lifecycle_rules)

# Add the new rule
lifecycle.append(rule)

# Set the lifecycle policy on the bucket
bucket.lifecycle_rules = lifecycle
bucket.patch()
logging.info('Chunk uploaded successfully in gcs')

def merge_file_gcs(bucket_name, original_file_name: str):
storage_client = storage.Client()
# Retrieve chunks from GCS
blobs = storage_client.list_blobs(bucket_name, prefix=f"{original_file_name}_part_")
chunks = []
for blob in blobs:
chunks.append(blob.download_as_bytes())
blob.delete()

# Merge chunks into a single file
merged_file = b"".join(chunks)
blob = storage_client.bucket(bucket_name).blob(original_file_name)
logging.info('save the merged file from chunks in gcs')
file_io = io.BytesIO(merged_file)
blob.upload_from_file(file_io)
pdf_reader = PdfReader(file_io)
file_size = len(merged_file)
total_pages = len(pdf_reader.pages)

return total_pages, file_size

def delete_file_from_gcs(bucket_name, file_name):
try:
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_name)
if blob.exists():
blob.delete()
logging.info('File deleted from GCS successfully')
except:
raise Exception('BLOB not exists in GCS')
6 changes: 3 additions & 3 deletions backend/src/graphDB_dataAccess.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
import os
from datetime import datetime
from langchain_community.graphs import Neo4jGraph
from src.shared.common_fn import delete_uploaded_local_file
from src.api_response import create_api_response
from src.document_sources.gcs_bucket import delete_file_from_gcs
from src.shared.constants import BUCKET_UPLOAD
from src.entities.source_node import sourceNode
import json

Expand Down Expand Up @@ -175,7 +175,7 @@ def delete_file_from_graph(self, filenames, source_types, deleteEntities:str, me
merged_file_path = os.path.join(merged_dir, file_name)
if source_type == 'local file':
logging.info(f'Deleted File Path: {merged_file_path} and Deleted File Name : {file_name}')
delete_uploaded_local_file(merged_file_path, file_name)
delete_file_from_gcs(BUCKET_UPLOAD,file_name)

query_to_delete_document="""
MATCH (d:Document) where d.fileName in $filename_list and d.fileSource in $source_types_list
Expand Down
66 changes: 42 additions & 24 deletions backend/src/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from langchain_community.graphs import Neo4jGraph
from src.shared.constants import BUCKET_UPLOAD, PROJECT_ID
from src.shared.schema_extraction import sceham_extraction_from_text
from dotenv import load_dotenv
from datetime import datetime
Expand Down Expand Up @@ -153,17 +154,19 @@ def create_source_node_graph_url_wikipedia(graph, model, wiki_query, source_type
success_count+=1
lst_file_name.append({'fileName':obj_source_node.file_name,'fileSize':obj_source_node.file_size,'url':obj_source_node.url, 'language':obj_source_node.language, 'status':'Success'})
return lst_file_name,success_count,failed_count

def extract_graph_from_file_local_file(graph, model, fileName, merged_file_path, allowedNodes, allowedRelationship):
def extract_graph_from_file_local_file(graph, model, merged_file_path, fileName, allowedNodes, allowedRelationship):

logging.info(f'Process file name :{fileName}')
file_name, pages, file_extension = get_documents_from_file_by_path(merged_file_path,fileName)
pdf_total_pages = pages[0].metadata['total_pages']

if pages==None or pdf_total_pages==0:
gcs_file_cache = os.environ.get('GCS_FILE_CACHE')
if gcs_file_cache == 'True' and fileName.split('.')[-1]=='pdf':
file_name, pages = get_documents_from_gcs( PROJECT_ID, BUCKET_UPLOAD, None, fileName)
else:
file_name, pages, file_extension = get_documents_from_file_by_path(merged_file_path,fileName)
if pages==None or len(pages)==0:
raise Exception(f'Pdf content is not available for file : {file_name}')

return processing_source(graph, model, file_name, pages, allowedNodes, allowedRelationship, merged_file_path)
return processing_source(graph, model, file_name, pages, allowedNodes, allowedRelationship, True, merged_file_path)

def extract_graph_from_file_s3(graph, model, source_url, aws_access_key_id, aws_secret_access_key, allowedNodes, allowedRelationship):

Expand Down Expand Up @@ -203,7 +206,7 @@ def extract_graph_from_file_gcs(graph, model, gcs_project_id, gcs_bucket_name, g

return processing_source(graph, model, file_name, pages, allowedNodes, allowedRelationship)

def processing_source(graph, model, file_name, pages, allowedNodes, allowedRelationship, merged_file_path=None):
def processing_source(graph, model, file_name, pages, allowedNodes, allowedRelationship, is_uploaded_from_local=None, merged_file_path=None):
"""
Extracts a Neo4jGraph from a PDF file based on the model.

Expand Down Expand Up @@ -301,8 +304,13 @@ def processing_source(graph, model, file_name, pages, allowedNodes, allowedRelat


# merged_file_path have value only when file uploaded from local
if merged_file_path is not None:
delete_uploaded_local_file(merged_file_path, file_name)

if is_uploaded_from_local:
gcs_file_cache = os.environ.get('GCS_FILE_CACHE')
if gcs_file_cache == 'True' and file_name.split('.')[-1]=='pdf':
delete_file_from_gcs(BUCKET_UPLOAD,file_name)
else:
delete_uploaded_local_file(merged_file_path, file_name)

return {
"fileName": file_name,
Expand Down Expand Up @@ -388,7 +396,7 @@ def connection_check(graph):
graph_DB_dataAccess = graphDBdataAccess(graph)
return graph_DB_dataAccess.connection_check()

def merge_chunks(file_name, total_chunks, chunk_dir, merged_dir):
def merge_chunks_local(file_name, total_chunks, chunk_dir, merged_dir):

if not os.path.exists(merged_dir):
os.mkdir(merged_dir)
Expand All @@ -405,38 +413,48 @@ def merge_chunks(file_name, total_chunks, chunk_dir, merged_dir):
file_name, pages, file_extension = get_documents_from_file_by_path(merged_file_path,file_name)
pdf_total_pages = pages[0].metadata['total_pages']
file_size = os.path.getsize(merged_file_path)
return file_size, pdf_total_pages, file_extension
return pdf_total_pages,file_size



def upload_file(graph, model, chunk, chunk_number:int, total_chunks:int, originalname, chunk_dir, merged_dir):
def upload_file(graph, model, chunk, chunk_number:int, total_chunks:int, originalname, uri, chunk_dir, merged_dir):

if not os.path.exists(chunk_dir):
os.mkdir(chunk_dir)
gcs_file_cache = os.environ.get('GCS_FILE_CACHE')
logging.info(f'gcs file cache: {gcs_file_cache}')

chunk_file_path = os.path.join(chunk_dir, f"{originalname}_part_{chunk_number}")
logging.info(f'Chunk File Path: {chunk_file_path}')

with open(chunk_file_path, "wb") as chunk_file:
if gcs_file_cache == 'True' and originalname.split('.')[-1]=='pdf':
upload_file_to_gcs(chunk, chunk_number, originalname, BUCKET_UPLOAD)
else:
if not os.path.exists(chunk_dir):
os.mkdir(chunk_dir)

chunk_file_path = os.path.join(chunk_dir, f"{originalname}_part_{chunk_number}")
logging.info(f'Chunk File Path: {chunk_file_path}')

with open(chunk_file_path, "wb") as chunk_file:
chunk_file.write(chunk.file.read())

if int(chunk_number) == int(total_chunks):
# If this is the last chunk, merge all chunks into a single file
file_size, pdf_total_pages, file_extension= merge_chunks(originalname, int(total_chunks), chunk_dir, merged_dir)
if gcs_file_cache == 'True' and originalname.split('.')[-1]=='pdf':
total_pages, file_size = merge_file_gcs(BUCKET_UPLOAD, originalname)
else:
total_pages, file_size = merge_chunks_local(originalname, int(total_chunks), chunk_dir, merged_dir)

logging.info("File merged successfully")

file_extension = originalname.split('.')[-1]
obj_source_node = sourceNode()
obj_source_node.file_name = originalname
obj_source_node.file_type = file_extension
obj_source_node.file_size = file_size
obj_source_node.file_source = 'local file'
obj_source_node.model = model
obj_source_node.total_pages = pdf_total_pages
obj_source_node.total_pages = total_pages
obj_source_node.created_at = datetime.now()
graphDb_data_Access = graphDBdataAccess(graph)

graphDb_data_Access.create_source_node(obj_source_node)
return {'file_size': file_size, 'total_pages': pdf_total_pages, 'file_name': originalname, 'file_extension':file_extension, 'message':f"Chunk {chunk_number}/{total_chunks} saved"}
return {'file_size': file_size, 'total_pages': total_pages, 'file_name': originalname, 'file_extension':file_extension, 'message':f"Chunk {chunk_number}/{total_chunks} saved"}
return f"Chunk {chunk_number}/{total_chunks} saved"

def get_labels_and_relationtypes(graph):
Expand Down Expand Up @@ -473,7 +491,7 @@ def manually_cancelled_job(graph, filenames, source_types, merged_dir):
merged_file_path = os.path.join(merged_dir, file_name)
if source_type == 'local file':
logging.info(f'Deleted File Path: {merged_file_path} and Deleted File Name : {file_name}')
delete_uploaded_local_file(merged_file_path, file_name)
delete_file_from_gcs(BUCKET_UPLOAD,file_name)
return "Cancelled the processing job successfully"

def populate_graph_schema_from_text(text, model, is_schema_description_cheked):
Expand Down
2 changes: 2 additions & 0 deletions backend/src/shared/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@
CHAT_SEARCH_KWARG_K = 3
CHAT_SEARCH_KWARG_SCORE_THRESHOLD = 0.7
GROQ_MODELS = ["Groq llama3"]
BUCKET_UPLOAD = 'llm-graph-builder-upload'
PROJECT_ID = 'llm-experiments-387609'
2 changes: 1 addition & 1 deletion frontend/src/utils/Constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export const defaultLLM = llms?.includes('OpenAI GPT 3.5')
? 'Gemini 1.0 Pro'
: 'Diffbot';

export const chunkSize = process.env.CHUNK_SIZE ? parseInt(process.env.CHUNK_SIZE) : 5 * 1024 * 1024;
export const chunkSize = process.env.CHUNK_SIZE ? parseInt(process.env.CHUNK_SIZE) : 1 * 1024 * 1024;
export const timeperpage = process.env.TIME_PER_PAGE ? parseInt(process.env.TIME_PER_PAGE) : 50;
export const NODES_OPTIONS = [
{
Expand Down