Skip to content

Commit 110f4fa

Browse files
Gcs upload folder hashed (#453)
* implement foldername hashed in GCS bucket uplaod * Raise exception if invalid model selected * folder name for gcs upload --------- Co-authored-by: aashipandya <156318202+aashipandya@users.noreply.github.com>
1 parent 9b00547 commit 110f4fa

File tree

6 files changed

+46
-42
lines changed

6 files changed

+46
-42
lines changed

backend/score.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ async def extract_knowledge_graph_from_file(
162162
merged_file_path = os.path.join(MERGED_DIR,file_name)
163163
logging.info(f'File path:{merged_file_path}')
164164
result = await asyncio.to_thread(
165-
extract_graph_from_file_local_file, graph, model, merged_file_path, file_name, allowedNodes, allowedRelationship)
165+
extract_graph_from_file_local_file, graph, model, merged_file_path, file_name, allowedNodes, allowedRelationship, uri)
166166

167167
elif source_type == 's3 bucket' and source_url:
168168
result = await asyncio.to_thread(
@@ -192,7 +192,8 @@ async def extract_knowledge_graph_from_file(
192192
graphDb_data_Access.update_exception_db(file_name,error_message)
193193
gcs_file_cache = os.environ.get('GCS_FILE_CACHE')
194194
if source_type == 'local file' and gcs_file_cache == 'True' and (file_name.split('.')[-1]).upper()=='PDF':
195-
delete_file_from_gcs(BUCKET_UPLOAD,file_name)
195+
folder_name = create_gcs_bucket_folder_name_hashed
196+
delete_file_from_gcs(BUCKET_UPLOAD,folder_name,file_name)
196197
else:
197198
logging.info(f'Deleted File Path: {merged_file_path} and Deleted File Name : {file_name}')
198199
delete_uploaded_local_file(merged_file_path,file_name)
@@ -432,7 +433,7 @@ async def delete_document_and_entities(uri=Form(None),
432433
try:
433434
graph = create_graph_database_connection(uri, userName, password, database)
434435
graphDb_data_Access = graphDBdataAccess(graph)
435-
result, files_list_size = await asyncio.to_thread(graphDb_data_Access.delete_file_from_graph, filenames, source_types, deleteEntities, MERGED_DIR)
436+
result, files_list_size = await asyncio.to_thread(graphDb_data_Access.delete_file_from_graph, filenames, source_types, deleteEntities, MERGED_DIR, uri)
436437
entities_count = result[0]['deletedEntities'] if 'deletedEntities' in result[0] else 0
437438
message = f"Deleted {files_list_size} documents with {entities_count} entities from database"
438439
josn_obj = {'api_name':'delete_document_and_entities','db_url':uri}
@@ -485,7 +486,7 @@ async def get_document_status(file_name, url, userName, password, database):
485486
async def cancelled_job(uri=Form(None), userName=Form(None), password=Form(None), database=Form(None), filenames=Form(None), source_types=Form(None)):
486487
try:
487488
graph = create_graph_database_connection(uri, userName, password, database)
488-
result = manually_cancelled_job(graph,filenames, source_types, MERGED_DIR)
489+
result = manually_cancelled_job(graph,filenames, source_types, MERGED_DIR, uri)
489490

490491
return create_api_response('Success',message=result)
491492
except Exception as e:

backend/src/document_sources/gcs_bucket.py

Lines changed: 15 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def get_gcs_bucket_files_info(gcs_project_id, gcs_bucket_name, gcs_bucket_folder
4242
def load_pdf(file_path):
4343
return PyMuPDFLoader(file_path)
4444

45-
def get_documents_from_gcs(gcs_project_id, gcs_bucket_name, gcs_bucket_folder, gcs_blob_filename, access_token=None):
45+
def get_documents_from_gcs(gcs_project_id, gcs_bucket_name, gcs_bucket_folder, gcs_blob_filename, access_token=None, folder_name_sha1_hashed = None):
4646

4747
if gcs_bucket_folder is not None:
4848
if gcs_bucket_folder.endswith('/'):
@@ -59,6 +59,7 @@ def get_documents_from_gcs(gcs_project_id, gcs_bucket_name, gcs_bucket_folder, g
5959
#creds= Credentials(access_token)
6060
if access_token is None:
6161
storage_client = storage.Client(project=gcs_project_id)
62+
blob_name = folder_name_sha1_hashed +'/'+gcs_blob_filename
6263
else:
6364
creds= Credentials(access_token)
6465
storage_client = storage.Client(project=gcs_project_id, credentials=creds)
@@ -79,67 +80,55 @@ def get_documents_from_gcs(gcs_project_id, gcs_bucket_name, gcs_bucket_folder, g
7980
raise Exception('Blob Not Found')
8081
return gcs_blob_filename, pages
8182

82-
def upload_file_to_gcs(file_chunk, chunk_number, original_file_name, bucket_name):
83+
def upload_file_to_gcs(file_chunk, chunk_number, original_file_name, bucket_name, folder_name_sha1_hashed):
8384
try:
8485
storage_client = storage.Client()
8586

8687
file_name = f'{original_file_name}_part_{chunk_number}'
8788
bucket = storage_client.bucket(bucket_name)
8889
file_data = file_chunk.file.read()
89-
# print(f'data after read {file_data}')
90-
91-
blob = bucket.blob(file_name)
90+
file_name_with__hashed_folder = folder_name_sha1_hashed +'/'+file_name
91+
logging.info(f'GCS folder pathin upload: {file_name_with__hashed_folder}')
92+
blob = bucket.blob(file_name_with__hashed_folder)
9293
file_io = io.BytesIO(file_data)
9394
blob.upload_from_file(file_io)
94-
# Define the lifecycle rule to delete objects after 6 hours
95-
# rule = {
96-
# "action": {"type": "Delete"},
97-
# "condition": {"age": 1} # Age in days (24 hours = 1 days)
98-
# }
99-
100-
# # Get the current lifecycle policy
101-
# lifecycle = list(bucket.lifecycle_rules)
102-
103-
# # Add the new rule
104-
# lifecycle.append(rule)
105-
106-
# # Set the lifecycle policy on the bucket
107-
# bucket.lifecycle_rules = lifecycle
108-
# bucket.patch()
10995
time.sleep(1)
11096
logging.info('Chunk uploaded successfully in gcs')
11197
except Exception as e:
11298
raise Exception('Error in while uploading the file chunks on GCS')
11399

114-
def merge_file_gcs(bucket_name, original_file_name: str):
100+
def merge_file_gcs(bucket_name, original_file_name: str, folder_name_sha1_hashed):
115101
try:
116102
storage_client = storage.Client()
117103
# Retrieve chunks from GCS
118-
blobs = storage_client.list_blobs(bucket_name, prefix=f"{original_file_name}_part_")
104+
blobs = storage_client.list_blobs(bucket_name, prefix=folder_name_sha1_hashed)
119105
chunks = []
120106
for blob in blobs:
121107
chunks.append(blob.download_as_bytes())
122108
blob.delete()
123109

124110
# Merge chunks into a single file
125111
merged_file = b"".join(chunks)
126-
blob = storage_client.bucket(bucket_name).blob(original_file_name)
112+
file_name_with__hashed_folder = folder_name_sha1_hashed +'/'+original_file_name
113+
logging.info(f'GCS folder path in merge: {file_name_with__hashed_folder}')
114+
blob = storage_client.bucket(bucket_name).blob(file_name_with__hashed_folder)
127115
logging.info('save the merged file from chunks in gcs')
128116
file_io = io.BytesIO(merged_file)
129117
blob.upload_from_file(file_io)
130-
pdf_reader = PdfReader(file_io)
118+
# pdf_reader = PdfReader(file_io)
131119
file_size = len(merged_file)
132120
# total_pages = len(pdf_reader.pages)
133121

134122
return file_size
135123
except Exception as e:
136124
raise Exception('Error in while merge the files chunks on GCS')
137125

138-
def delete_file_from_gcs(bucket_name, file_name):
126+
def delete_file_from_gcs(bucket_name,folder_name, file_name):
139127
try:
140128
storage_client = storage.Client()
141129
bucket = storage_client.bucket(bucket_name)
142-
blob = bucket.blob(file_name)
130+
folder_file_name = folder_name +'/'+file_name
131+
blob = bucket.blob(folder_file_name)
143132
if blob.exists():
144133
blob.delete()
145134
logging.info('File deleted from GCS successfully')

backend/src/generate_graphDocuments_from_llm.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ def generate_graphDocuments(model: str, graph: Neo4jGraph, chunkId_chunkDoc_list
3535

3636
elif model in GROQ_MODELS :
3737
graph_documents = get_graph_from_Groq_Llama3(MODEL_VERSIONS[model], graph, chunkId_chunkDoc_list, allowedNodes, allowedRelationship)
38+
else:
39+
raise Exception('Invalid LLM Model')
3840

3941
logging.info(f"graph_documents = {len(graph_documents)}")
4042
return graph_documents

backend/src/graphDB_dataAccess.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import os
33
from datetime import datetime
44
from langchain_community.graphs import Neo4jGraph
5-
from src.shared.common_fn import delete_uploaded_local_file
5+
from src.shared.common_fn import create_gcs_bucket_folder_name_hashed, delete_uploaded_local_file
66
from src.document_sources.gcs_bucket import delete_file_from_gcs
77
from src.shared.constants import BUCKET_UPLOAD
88
from src.entities.source_node import sourceNode
@@ -167,7 +167,7 @@ def get_current_status_document_node(self, file_name):
167167
param = {"file_name" : file_name}
168168
return self.execute_query(query, param)
169169

170-
def delete_file_from_graph(self, filenames, source_types, deleteEntities:str, merged_dir:str):
170+
def delete_file_from_graph(self, filenames, source_types, deleteEntities:str, merged_dir:str, uri):
171171
# filename_list = filenames.split(',')
172172
filename_list= list(map(str.strip, json.loads(filenames)))
173173
source_types_list= list(map(str.strip, json.loads(source_types)))
@@ -176,7 +176,8 @@ def delete_file_from_graph(self, filenames, source_types, deleteEntities:str, me
176176
for (file_name,source_type) in zip(filename_list, source_types_list):
177177
merged_file_path = os.path.join(merged_dir, file_name)
178178
if source_type == 'local file' and gcs_file_cache == 'True' and (file_name.split('.')[-1]).upper()=='PDF':
179-
delete_file_from_gcs(BUCKET_UPLOAD,file_name)
179+
folder_name = create_gcs_bucket_folder_name_hashed(uri, file_name)
180+
delete_file_from_gcs(BUCKET_UPLOAD,folder_name,file_name)
180181
else:
181182
logging.info(f'Deleted File Path: {merged_file_path} and Deleted File Name : {file_name}')
182183
delete_uploaded_local_file(merged_file_path,file_name)

backend/src/main.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -155,18 +155,19 @@ def create_source_node_graph_url_wikipedia(graph, model, wiki_query, source_type
155155
lst_file_name.append({'fileName':obj_source_node.file_name,'fileSize':obj_source_node.file_size,'url':obj_source_node.url, 'language':obj_source_node.language, 'status':'Success'})
156156
return lst_file_name,success_count,failed_count
157157

158-
def extract_graph_from_file_local_file(graph, model, merged_file_path, fileName, allowedNodes, allowedRelationship):
158+
def extract_graph_from_file_local_file(graph, model, merged_file_path, fileName, allowedNodes, allowedRelationship,uri):
159159

160160
logging.info(f'Process file name :{fileName}')
161161
gcs_file_cache = os.environ.get('GCS_FILE_CACHE')
162162
if gcs_file_cache == 'True' and (fileName.split('.')[-1]).upper() =='PDF':
163-
file_name, pages = get_documents_from_gcs( PROJECT_ID, BUCKET_UPLOAD, None, fileName)
163+
folder_name = create_gcs_bucket_folder_name_hashed(uri, fileName)
164+
file_name, pages = get_documents_from_gcs( PROJECT_ID, BUCKET_UPLOAD, None, fileName, folder_name_sha1_hashed=folder_name)
164165
else:
165166
file_name, pages, file_extension = get_documents_from_file_by_path(merged_file_path,fileName)
166167
if pages==None or len(pages)==0:
167168
raise Exception(f'Pdf content is not available for file : {file_name}')
168169

169-
return processing_source(graph, model, file_name, pages, allowedNodes, allowedRelationship, True, merged_file_path)
170+
return processing_source(graph, model, file_name, pages, allowedNodes, allowedRelationship, True, merged_file_path, uri)
170171

171172
def extract_graph_from_file_s3(graph, model, source_url, aws_access_key_id, aws_secret_access_key, allowedNodes, allowedRelationship):
172173

@@ -206,7 +207,7 @@ def extract_graph_from_file_gcs(graph, model, gcs_project_id, gcs_bucket_name, g
206207

207208
return processing_source(graph, model, file_name, pages, allowedNodes, allowedRelationship)
208209

209-
def processing_source(graph, model, file_name, pages, allowedNodes, allowedRelationship, is_uploaded_from_local=None, merged_file_path=None):
210+
def processing_source(graph, model, file_name, pages, allowedNodes, allowedRelationship, is_uploaded_from_local=None, merged_file_path=None, uri=None):
210211
"""
211212
Extracts a Neo4jGraph from a PDF file based on the model.
212213
@@ -308,7 +309,8 @@ def processing_source(graph, model, file_name, pages, allowedNodes, allowedRelat
308309
if is_uploaded_from_local:
309310
gcs_file_cache = os.environ.get('GCS_FILE_CACHE')
310311
if gcs_file_cache == 'True' and (file_name.split('.')[-1]).upper()=='PDF':
311-
delete_file_from_gcs(BUCKET_UPLOAD,file_name)
312+
folder_name = create_gcs_bucket_folder_name_hashed(uri, file_name)
313+
delete_file_from_gcs(BUCKET_UPLOAD,folder_name,file_name)
312314
else:
313315
delete_uploaded_local_file(merged_file_path, file_name)
314316

@@ -423,7 +425,8 @@ def upload_file(graph, model, chunk, chunk_number:int, total_chunks:int, origina
423425
logging.info(f'gcs file cache: {gcs_file_cache}')
424426

425427
if gcs_file_cache == 'True' and (originalname.split('.')[-1]).upper() =='PDF':
426-
upload_file_to_gcs(chunk, chunk_number, originalname, BUCKET_UPLOAD)
428+
folder_name = create_gcs_bucket_folder_name_hashed(uri,originalname)
429+
upload_file_to_gcs(chunk, chunk_number, originalname, BUCKET_UPLOAD, folder_name)
427430
else:
428431
if not os.path.exists(chunk_dir):
429432
os.mkdir(chunk_dir)
@@ -437,7 +440,8 @@ def upload_file(graph, model, chunk, chunk_number:int, total_chunks:int, origina
437440
if int(chunk_number) == int(total_chunks):
438441
# If this is the last chunk, merge all chunks into a single file
439442
if gcs_file_cache == 'True' and (originalname.split('.')[-1]).upper()=='PDF':
440-
total_pages, file_size = merge_file_gcs(BUCKET_UPLOAD, originalname)
443+
file_size = merge_file_gcs(BUCKET_UPLOAD, originalname, folder_name)
444+
total_pages = 1
441445
else:
442446
total_pages, file_size = merge_chunks_local(originalname, int(total_chunks), chunk_dir, merged_dir)
443447

@@ -474,7 +478,7 @@ def get_labels_and_relationtypes(graph):
474478
result=[]
475479
return result
476480

477-
def manually_cancelled_job(graph, filenames, source_types, merged_dir):
481+
def manually_cancelled_job(graph, filenames, source_types, merged_dir, uri):
478482

479483
filename_list= list(map(str.strip, json.loads(filenames)))
480484
source_types_list= list(map(str.strip, json.loads(source_types)))
@@ -491,7 +495,8 @@ def manually_cancelled_job(graph, filenames, source_types, merged_dir):
491495
obj_source_node = None
492496
merged_file_path = os.path.join(merged_dir, file_name)
493497
if source_type == 'local file' and gcs_file_cache == 'True' and (file_name.split('.')[-1]).upper()=='PDF':
494-
delete_file_from_gcs(BUCKET_UPLOAD,file_name)
498+
folder_name = create_gcs_bucket_folder_name_hashed(uri, file_name)
499+
delete_file_from_gcs(BUCKET_UPLOAD,folder_name,file_name)
495500
else:
496501
logging.info(f'Deleted File Path: {merged_file_path} and Deleted File Name : {file_name}')
497502
delete_uploaded_local_file(merged_file_path,file_name)

backend/src/shared/common_fn.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import hashlib
12
import logging
23
from src.document_sources.youtube import create_youtube_url
34
from langchain_community.embeddings.sentence_transformer import SentenceTransformerEmbeddings
@@ -144,3 +145,8 @@ def get_llm(model_version:str) :
144145
logging.info(f"Model created - Model Version: {model_version}")
145146
return llm
146147

148+
def create_gcs_bucket_folder_name_hashed(uri, file_name):
149+
folder_name = uri + file_name
150+
folder_name_sha1 = hashlib.sha1(folder_name.encode())
151+
folder_name_sha1_hashed = folder_name_sha1.hexdigest()
152+
return folder_name_sha1_hashed

0 commit comments

Comments
 (0)