Skip to content

Gcs upload folder hashed #453

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions backend/score.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ async def extract_knowledge_graph_from_file(
merged_file_path = os.path.join(MERGED_DIR,file_name)
logging.info(f'File path:{merged_file_path}')
result = await asyncio.to_thread(
extract_graph_from_file_local_file, graph, model, merged_file_path, file_name, allowedNodes, allowedRelationship)
extract_graph_from_file_local_file, graph, model, merged_file_path, file_name, allowedNodes, allowedRelationship, uri)

elif source_type == 's3 bucket' and source_url:
result = await asyncio.to_thread(
Expand Down Expand Up @@ -192,7 +192,8 @@ async def extract_knowledge_graph_from_file(
graphDb_data_Access.update_exception_db(file_name,error_message)
gcs_file_cache = os.environ.get('GCS_FILE_CACHE')
if source_type == 'local file' and gcs_file_cache == 'True' and (file_name.split('.')[-1]).upper()=='PDF':
delete_file_from_gcs(BUCKET_UPLOAD,file_name)
folder_name = create_gcs_bucket_folder_name_hashed
delete_file_from_gcs(BUCKET_UPLOAD,folder_name,file_name)
else:
logging.info(f'Deleted File Path: {merged_file_path} and Deleted File Name : {file_name}')
delete_uploaded_local_file(merged_file_path,file_name)
Expand Down Expand Up @@ -432,7 +433,7 @@ async def delete_document_and_entities(uri=Form(None),
try:
graph = create_graph_database_connection(uri, userName, password, database)
graphDb_data_Access = graphDBdataAccess(graph)
result, files_list_size = await asyncio.to_thread(graphDb_data_Access.delete_file_from_graph, filenames, source_types, deleteEntities, MERGED_DIR)
result, files_list_size = await asyncio.to_thread(graphDb_data_Access.delete_file_from_graph, filenames, source_types, deleteEntities, MERGED_DIR, uri)
entities_count = result[0]['deletedEntities'] if 'deletedEntities' in result[0] else 0
message = f"Deleted {files_list_size} documents with {entities_count} entities from database"
josn_obj = {'api_name':'delete_document_and_entities','db_url':uri}
Expand Down Expand Up @@ -485,7 +486,7 @@ async def get_document_status(file_name, url, userName, password, database):
async def cancelled_job(uri=Form(None), userName=Form(None), password=Form(None), database=Form(None), filenames=Form(None), source_types=Form(None)):
try:
graph = create_graph_database_connection(uri, userName, password, database)
result = manually_cancelled_job(graph,filenames, source_types, MERGED_DIR)
result = manually_cancelled_job(graph,filenames, source_types, MERGED_DIR, uri)

return create_api_response('Success',message=result)
except Exception as e:
Expand Down
41 changes: 15 additions & 26 deletions backend/src/document_sources/gcs_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def get_gcs_bucket_files_info(gcs_project_id, gcs_bucket_name, gcs_bucket_folder
def load_pdf(file_path):
return PyMuPDFLoader(file_path)

def get_documents_from_gcs(gcs_project_id, gcs_bucket_name, gcs_bucket_folder, gcs_blob_filename, access_token=None):
def get_documents_from_gcs(gcs_project_id, gcs_bucket_name, gcs_bucket_folder, gcs_blob_filename, access_token=None, folder_name_sha1_hashed = None):

if gcs_bucket_folder is not None:
if gcs_bucket_folder.endswith('/'):
Expand All @@ -59,6 +59,7 @@ def get_documents_from_gcs(gcs_project_id, gcs_bucket_name, gcs_bucket_folder, g
#creds= Credentials(access_token)
if access_token is None:
storage_client = storage.Client(project=gcs_project_id)
blob_name = folder_name_sha1_hashed +'/'+gcs_blob_filename
else:
creds= Credentials(access_token)
storage_client = storage.Client(project=gcs_project_id, credentials=creds)
Expand All @@ -79,67 +80,55 @@ def get_documents_from_gcs(gcs_project_id, gcs_bucket_name, gcs_bucket_folder, g
raise Exception('Blob Not Found')
return gcs_blob_filename, pages

def upload_file_to_gcs(file_chunk, chunk_number, original_file_name, bucket_name):
def upload_file_to_gcs(file_chunk, chunk_number, original_file_name, bucket_name, folder_name_sha1_hashed):
try:
storage_client = storage.Client()

file_name = f'{original_file_name}_part_{chunk_number}'
bucket = storage_client.bucket(bucket_name)
file_data = file_chunk.file.read()
# print(f'data after read {file_data}')
blob = bucket.blob(file_name)
file_name_with__hashed_folder = folder_name_sha1_hashed +'/'+file_name
logging.info(f'GCS folder pathin upload: {file_name_with__hashed_folder}')
blob = bucket.blob(file_name_with__hashed_folder)
file_io = io.BytesIO(file_data)
blob.upload_from_file(file_io)
# Define the lifecycle rule to delete objects after 6 hours
# rule = {
# "action": {"type": "Delete"},
# "condition": {"age": 1} # Age in days (24 hours = 1 days)
# }

# # Get the current lifecycle policy
# lifecycle = list(bucket.lifecycle_rules)

# # Add the new rule
# lifecycle.append(rule)

# # Set the lifecycle policy on the bucket
# bucket.lifecycle_rules = lifecycle
# bucket.patch()
time.sleep(1)
logging.info('Chunk uploaded successfully in gcs')
except Exception as e:
raise Exception('Error in while uploading the file chunks on GCS')

def merge_file_gcs(bucket_name, original_file_name: str):
def merge_file_gcs(bucket_name, original_file_name: str, folder_name_sha1_hashed):
try:
storage_client = storage.Client()
# Retrieve chunks from GCS
blobs = storage_client.list_blobs(bucket_name, prefix=f"{original_file_name}_part_")
blobs = storage_client.list_blobs(bucket_name, prefix=folder_name_sha1_hashed)
chunks = []
for blob in blobs:
chunks.append(blob.download_as_bytes())
blob.delete()

# Merge chunks into a single file
merged_file = b"".join(chunks)
blob = storage_client.bucket(bucket_name).blob(original_file_name)
file_name_with__hashed_folder = folder_name_sha1_hashed +'/'+original_file_name
logging.info(f'GCS folder path in merge: {file_name_with__hashed_folder}')
blob = storage_client.bucket(bucket_name).blob(file_name_with__hashed_folder)
logging.info('save the merged file from chunks in gcs')
file_io = io.BytesIO(merged_file)
blob.upload_from_file(file_io)
pdf_reader = PdfReader(file_io)
# pdf_reader = PdfReader(file_io)
file_size = len(merged_file)
# total_pages = len(pdf_reader.pages)

return file_size
except Exception as e:
raise Exception('Error in while merge the files chunks on GCS')

def delete_file_from_gcs(bucket_name, file_name):
def delete_file_from_gcs(bucket_name,folder_name, file_name):
try:
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_name)
folder_file_name = folder_name +'/'+file_name
blob = bucket.blob(folder_file_name)
if blob.exists():
blob.delete()
logging.info('File deleted from GCS successfully')
Expand Down
2 changes: 2 additions & 0 deletions backend/src/generate_graphDocuments_from_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ def generate_graphDocuments(model: str, graph: Neo4jGraph, chunkId_chunkDoc_list

elif model in GROQ_MODELS :
graph_documents = get_graph_from_Groq_Llama3(MODEL_VERSIONS[model], graph, chunkId_chunkDoc_list, allowedNodes, allowedRelationship)
else:
raise Exception('Invalid LLM Model')

logging.info(f"graph_documents = {len(graph_documents)}")
return graph_documents
7 changes: 4 additions & 3 deletions backend/src/graphDB_dataAccess.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
from datetime import datetime
from langchain_community.graphs import Neo4jGraph
from src.shared.common_fn import delete_uploaded_local_file
from src.shared.common_fn import create_gcs_bucket_folder_name_hashed, delete_uploaded_local_file
from src.document_sources.gcs_bucket import delete_file_from_gcs
from src.shared.constants import BUCKET_UPLOAD
from src.entities.source_node import sourceNode
Expand Down Expand Up @@ -167,7 +167,7 @@ def get_current_status_document_node(self, file_name):
param = {"file_name" : file_name}
return self.execute_query(query, param)

def delete_file_from_graph(self, filenames, source_types, deleteEntities:str, merged_dir:str):
def delete_file_from_graph(self, filenames, source_types, deleteEntities:str, merged_dir:str, uri):
# filename_list = filenames.split(',')
filename_list= list(map(str.strip, json.loads(filenames)))
source_types_list= list(map(str.strip, json.loads(source_types)))
Expand All @@ -176,7 +176,8 @@ def delete_file_from_graph(self, filenames, source_types, deleteEntities:str, me
for (file_name,source_type) in zip(filename_list, source_types_list):
merged_file_path = os.path.join(merged_dir, file_name)
if source_type == 'local file' and gcs_file_cache == 'True' and (file_name.split('.')[-1]).upper()=='PDF':
delete_file_from_gcs(BUCKET_UPLOAD,file_name)
folder_name = create_gcs_bucket_folder_name_hashed(uri, file_name)
delete_file_from_gcs(BUCKET_UPLOAD,folder_name,file_name)
else:
logging.info(f'Deleted File Path: {merged_file_path} and Deleted File Name : {file_name}')
delete_uploaded_local_file(merged_file_path,file_name)
Expand Down
23 changes: 14 additions & 9 deletions backend/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,18 +155,19 @@ def create_source_node_graph_url_wikipedia(graph, model, wiki_query, source_type
lst_file_name.append({'fileName':obj_source_node.file_name,'fileSize':obj_source_node.file_size,'url':obj_source_node.url, 'language':obj_source_node.language, 'status':'Success'})
return lst_file_name,success_count,failed_count

def extract_graph_from_file_local_file(graph, model, merged_file_path, fileName, allowedNodes, allowedRelationship):
def extract_graph_from_file_local_file(graph, model, merged_file_path, fileName, allowedNodes, allowedRelationship,uri):

logging.info(f'Process file name :{fileName}')
gcs_file_cache = os.environ.get('GCS_FILE_CACHE')
if gcs_file_cache == 'True' and (fileName.split('.')[-1]).upper() =='PDF':
file_name, pages = get_documents_from_gcs( PROJECT_ID, BUCKET_UPLOAD, None, fileName)
folder_name = create_gcs_bucket_folder_name_hashed(uri, fileName)
file_name, pages = get_documents_from_gcs( PROJECT_ID, BUCKET_UPLOAD, None, fileName, folder_name_sha1_hashed=folder_name)
else:
file_name, pages, file_extension = get_documents_from_file_by_path(merged_file_path,fileName)
if pages==None or len(pages)==0:
raise Exception(f'Pdf content is not available for file : {file_name}')

return processing_source(graph, model, file_name, pages, allowedNodes, allowedRelationship, True, merged_file_path)
return processing_source(graph, model, file_name, pages, allowedNodes, allowedRelationship, True, merged_file_path, uri)

def extract_graph_from_file_s3(graph, model, source_url, aws_access_key_id, aws_secret_access_key, allowedNodes, allowedRelationship):

Expand Down Expand Up @@ -206,7 +207,7 @@ def extract_graph_from_file_gcs(graph, model, gcs_project_id, gcs_bucket_name, g

return processing_source(graph, model, file_name, pages, allowedNodes, allowedRelationship)

def processing_source(graph, model, file_name, pages, allowedNodes, allowedRelationship, is_uploaded_from_local=None, merged_file_path=None):
def processing_source(graph, model, file_name, pages, allowedNodes, allowedRelationship, is_uploaded_from_local=None, merged_file_path=None, uri=None):
"""
Extracts a Neo4jGraph from a PDF file based on the model.

Expand Down Expand Up @@ -308,7 +309,8 @@ def processing_source(graph, model, file_name, pages, allowedNodes, allowedRelat
if is_uploaded_from_local:
gcs_file_cache = os.environ.get('GCS_FILE_CACHE')
if gcs_file_cache == 'True' and (file_name.split('.')[-1]).upper()=='PDF':
delete_file_from_gcs(BUCKET_UPLOAD,file_name)
folder_name = create_gcs_bucket_folder_name_hashed(uri, file_name)
delete_file_from_gcs(BUCKET_UPLOAD,folder_name,file_name)
else:
delete_uploaded_local_file(merged_file_path, file_name)

Expand Down Expand Up @@ -423,7 +425,8 @@ def upload_file(graph, model, chunk, chunk_number:int, total_chunks:int, origina
logging.info(f'gcs file cache: {gcs_file_cache}')

if gcs_file_cache == 'True' and (originalname.split('.')[-1]).upper() =='PDF':
upload_file_to_gcs(chunk, chunk_number, originalname, BUCKET_UPLOAD)
folder_name = create_gcs_bucket_folder_name_hashed(uri,originalname)
upload_file_to_gcs(chunk, chunk_number, originalname, BUCKET_UPLOAD, folder_name)
else:
if not os.path.exists(chunk_dir):
os.mkdir(chunk_dir)
Expand All @@ -437,7 +440,8 @@ def upload_file(graph, model, chunk, chunk_number:int, total_chunks:int, origina
if int(chunk_number) == int(total_chunks):
# If this is the last chunk, merge all chunks into a single file
if gcs_file_cache == 'True' and (originalname.split('.')[-1]).upper()=='PDF':
total_pages, file_size = merge_file_gcs(BUCKET_UPLOAD, originalname)
file_size = merge_file_gcs(BUCKET_UPLOAD, originalname, folder_name)
total_pages = 1
else:
total_pages, file_size = merge_chunks_local(originalname, int(total_chunks), chunk_dir, merged_dir)

Expand Down Expand Up @@ -474,7 +478,7 @@ def get_labels_and_relationtypes(graph):
result=[]
return result

def manually_cancelled_job(graph, filenames, source_types, merged_dir):
def manually_cancelled_job(graph, filenames, source_types, merged_dir, uri):

filename_list= list(map(str.strip, json.loads(filenames)))
source_types_list= list(map(str.strip, json.loads(source_types)))
Expand All @@ -491,7 +495,8 @@ def manually_cancelled_job(graph, filenames, source_types, merged_dir):
obj_source_node = None
merged_file_path = os.path.join(merged_dir, file_name)
if source_type == 'local file' and gcs_file_cache == 'True' and (file_name.split('.')[-1]).upper()=='PDF':
delete_file_from_gcs(BUCKET_UPLOAD,file_name)
folder_name = create_gcs_bucket_folder_name_hashed(uri, file_name)
delete_file_from_gcs(BUCKET_UPLOAD,folder_name,file_name)
else:
logging.info(f'Deleted File Path: {merged_file_path} and Deleted File Name : {file_name}')
delete_uploaded_local_file(merged_file_path,file_name)
Expand Down
6 changes: 6 additions & 0 deletions backend/src/shared/common_fn.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import hashlib
import logging
from src.document_sources.youtube import create_youtube_url
from langchain_community.embeddings.sentence_transformer import SentenceTransformerEmbeddings
Expand Down Expand Up @@ -144,3 +145,8 @@ def get_llm(model_version:str) :
logging.info(f"Model created - Model Version: {model_version}")
return llm

def create_gcs_bucket_folder_name_hashed(uri, file_name):
folder_name = uri + file_name
folder_name_sha1 = hashlib.sha1(folder_name.encode())
folder_name_sha1_hashed = folder_name_sha1.hexdigest()
return folder_name_sha1_hashed