Skip to content

Staging to main #467

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion backend/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ EXPOSE 8000
RUN apt-get update && \
apt-get install -y --no-install-recommends \
libgl1-mesa-glx \
libreoffice \
cmake \
poppler-utils \
tesseract-ocr && \
Expand All @@ -19,4 +20,4 @@ RUN pip install --no-cache-dir --upgrade -r requirements.txt
# Copy application code
COPY . /code
# Set command
CMD ["gunicorn", "score:app", "--workers", "2", "--worker-class", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:8000", "--timeout", "300"]
CMD ["gunicorn", "score:app", "--workers", "8","--threads", "8", "--worker-class", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:8000", "--timeout", "300"]
2 changes: 1 addition & 1 deletion backend/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ matplotlib==3.7.2
mpmath==1.3.0
multidict==6.0.5
mypy-extensions==1.0.0
neo4j==5.20.0
neo4j-rust-ext
networkx==3.2.1
nltk==3.8.1
numpy==1.26.4
Expand Down
19 changes: 13 additions & 6 deletions backend/score.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ async def extract_knowledge_graph_from_file(
merged_file_path = os.path.join(MERGED_DIR,file_name)
logging.info(f'File path:{merged_file_path}')
result = await asyncio.to_thread(
extract_graph_from_file_local_file, graph, model, merged_file_path, file_name, allowedNodes, allowedRelationship)
extract_graph_from_file_local_file, graph, model, merged_file_path, file_name, allowedNodes, allowedRelationship, uri)

elif source_type == 's3 bucket' and source_url:
result = await asyncio.to_thread(
Expand Down Expand Up @@ -190,8 +190,14 @@ async def extract_knowledge_graph_from_file(
message=f"Failed To Process File:{file_name} or LLM Unable To Parse Content "
error_message = str(e)
graphDb_data_Access.update_exception_db(file_name,error_message)
gcs_file_cache = os.environ.get('GCS_FILE_CACHE')
if source_type == 'local file':
delete_file_from_gcs(BUCKET_UPLOAD,file_name)
if gcs_file_cache == 'True':
folder_name = create_gcs_bucket_folder_name_hashed(uri,file_name)
delete_file_from_gcs(BUCKET_UPLOAD,folder_name,file_name)
else:
logging.info(f'Deleted File Path: {merged_file_path} and Deleted File Name : {file_name}')
delete_uploaded_local_file(merged_file_path,file_name)
josn_obj = {'message':message,'error_message':error_message, 'file_name': file_name,'status':'Failed','db_url':uri,'failed_count':1, 'source_type': source_type}
logger.log_struct(josn_obj)
logging.exception(f'File Failed in extraction: {josn_obj}')
Expand Down Expand Up @@ -351,10 +357,11 @@ async def upload_large_file_into_chunks(file:UploadFile = File(...), chunkNumber
return create_api_response('Success', message=result)
except Exception as e:
# job_status = "Failed"
message="Unable to upload large file into chunks or saving the chunks"
message="Unable to upload large file into chunks. "
error_message = str(e)
logging.info(message)
logging.exception(f'Exception:{error_message}')
return create_api_response('Failed', message=message + error_message[:100], error=error_message, file_name = originalname)
finally:
close_db_connection(graph, 'upload')

Expand All @@ -368,11 +375,11 @@ async def get_structured_schema(uri=Form(None), userName=Form(None), password=Fo
logger.log_struct(josn_obj)
return create_api_response('Success', data=result)
except Exception as e:
job_status = "Failed"
message="Unable to get the labels and relationtypes from neo4j database"
error_message = str(e)
logging.info(message)
logging.exception(f'Exception:{error_message}')
return create_api_response("Failed", message=message, error=error_message)
finally:
close_db_connection(graph, 'schema')

Expand Down Expand Up @@ -427,7 +434,7 @@ async def delete_document_and_entities(uri=Form(None),
try:
graph = create_graph_database_connection(uri, userName, password, database)
graphDb_data_Access = graphDBdataAccess(graph)
result, files_list_size = await asyncio.to_thread(graphDb_data_Access.delete_file_from_graph, filenames, source_types, deleteEntities, MERGED_DIR)
result, files_list_size = await asyncio.to_thread(graphDb_data_Access.delete_file_from_graph, filenames, source_types, deleteEntities, MERGED_DIR, uri)
entities_count = result[0]['deletedEntities'] if 'deletedEntities' in result[0] else 0
message = f"Deleted {files_list_size} documents with {entities_count} entities from database"
josn_obj = {'api_name':'delete_document_and_entities','db_url':uri}
Expand Down Expand Up @@ -480,7 +487,7 @@ async def get_document_status(file_name, url, userName, password, database):
async def cancelled_job(uri=Form(None), userName=Form(None), password=Form(None), database=Form(None), filenames=Form(None), source_types=Form(None)):
try:
graph = create_graph_database_connection(uri, userName, password, database)
result = manually_cancelled_job(graph,filenames, source_types, MERGED_DIR)
result = manually_cancelled_job(graph,filenames, source_types, MERGED_DIR, uri)

return create_api_response('Success',message=result)
except Exception as e:
Expand Down
13 changes: 6 additions & 7 deletions backend/src/QA_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,13 @@ def get_llm(model: str,max_tokens=1000) -> Any:
"""Retrieve the specified language model based on the model name."""

model_versions = {
"OpenAI GPT 3.5": "gpt-3.5-turbo-16k",
"Gemini Pro": "gemini-1.0-pro-001",
"Gemini 1.5 Pro": "gemini-1.5-pro-preview-0409",
"OpenAI GPT 4": "gpt-4-0125-preview",
"Diffbot" : "gpt-4-0125-preview",
"OpenAI GPT 4o":"gpt-4o"
"gpt-3.5": "gpt-3.5-turbo-16k",
"gemini-1.0-pro": "gemini-1.0-pro-001",
"gemini-1.5-pro": "gemini-1.5-pro-preview-0409",
"gpt-4": "gpt-4-0125-preview",
"diffbot" : "gpt-4-0125-preview",
"gpt-4o":"gpt-4o"
}

if model in model_versions:
model_version = model_versions[model]
logging.info(f"Chat Model: {model}, Model Version: {model_version}")
Expand Down
89 changes: 41 additions & 48 deletions backend/src/chunkid_entities.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import logging
from neo4j import graph
from langchain_community.graphs import graph_document
from src.graph_query import *


CHUNK_QUERY = """
match (chunk:Chunk) where chunk.id IN $chunksIds

Expand All @@ -15,41 +13,51 @@
RETURN collect(distinct r) as rels
}
WITH d, collect(distinct chunk) as chunks, apoc.coll.toSet(apoc.coll.flatten(collect(rels))) as rels
return [r in rels | [startNode(r), endNode(r), r]] as entities
RETURN d as doc, [chunk in chunks | chunk {.*, embedding:null}] as chunks,
[r in rels | {startNode:{element_id:elementId(startNode(r)), labels:labels(startNode(r)), properties:{id:startNode(r).id,description:startNode(r).description}},
endNode:{element_id:elementId(endNode(r)), labels:labels(endNode(r)), properties:{id:endNode(r).id,description:endNode(r).description}},
relationship: {type:type(r), element_id:elementId(r)}}] as entities
"""

CHUNK_TEXT_QUERY = "match (doc)<-[:PART_OF]-(chunk:Chunk) WHERE chunk.id IN $chunkIds RETURN doc, collect(chunk {.*, embedding:null}) as chunks"


def process_record(record, elements_data):
def process_records(records):
"""
Processes a record to extract and organize node and relationship data.
"""
try:
entities = record["entities"]
for entity in entities:
for element in entity:
element_id = element.element_id
if isinstance(element, graph.Node):
if element_id not in elements_data["seen_nodes"]:
elements_data["seen_nodes"].add(element_id)
node_element = process_node(element)
elements_data["nodes"].append(node_element)
else:
if element_id not in elements_data["seen_relationships"]:
elements_data["seen_relationships"].add(element_id)
nodes = element.nodes
if len(nodes) < 2:
logging.warning(f"Relationship with ID {element_id} does not have two nodes.")
continue
relationship = {
"element_id": element_id,
"type": element.type,
"start_node_element_id": process_node(nodes[0])["element_id"],
"end_node_element_id": process_node(nodes[1])["element_id"],
}
elements_data["relationships"].append(relationship)
return elements_data
try:
nodes = []
relationships = []
seen_nodes = set()
seen_relationships = set()

for record in records:
for element in record["entities"]:
start_node = element['startNode']
end_node = element['endNode']
relationship = element['relationship']

if start_node['element_id'] not in seen_nodes:
nodes.append(start_node)
seen_nodes.add(start_node['element_id'])

if end_node['element_id'] not in seen_nodes:
nodes.append(end_node)
seen_nodes.add(end_node['element_id'])

if relationship['element_id'] not in seen_relationships:
relationships.append({
"element_id": relationship['element_id'],
"type": relationship['type'],
"start_node_element_id": start_node['element_id'],
"end_node_element_id": end_node['element_id']
})
seen_relationships.add(relationship['element_id'])
output = {
"nodes": nodes,
"relationships": relationships
}

return output
except Exception as e:
logging.error(f"chunkid_entities module: An error occurred while extracting the nodes and relationships from records: {e}")

Expand Down Expand Up @@ -97,24 +105,9 @@ def get_entities_from_chunkids(uri, username, password, chunk_ids):
chunk_ids_list = chunk_ids.split(",")
driver = get_graphDB_driver(uri, username, password)
records, summary, keys = driver.execute_query(CHUNK_QUERY, chunksIds=chunk_ids_list)
elements_data = {
"nodes": [],
"seen_nodes": set(),
"relationships": [],
"seen_relationships": set()
}
for record in records:
elements_data = process_record(record, elements_data)

result = process_records(records)
logging.info(f"Nodes and relationships are processed")

chunk_data,summary, keys = driver.execute_query(CHUNK_TEXT_QUERY, chunkIds=chunk_ids_list)
chunk_properties = process_chunk_data(chunk_data)
result = {
"nodes": elements_data["nodes"],
"relationships": elements_data["relationships"],
"chunk_data": chunk_properties
}
result["chunk_data"] = process_chunk_data(records)
logging.info(f"Query process completed successfully for chunk ids")
return result

Expand Down
139 changes: 71 additions & 68 deletions backend/src/document_sources/gcs_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io
from google.oauth2.credentials import Credentials
import time
from .local_file import load_document_content, get_pages_with_page_numbers

def get_gcs_bucket_files_info(gcs_project_id, gcs_bucket_name, gcs_bucket_folder, creds):
storage_client = storage.Client(project=gcs_project_id, credentials=creds)
Expand Down Expand Up @@ -51,88 +52,90 @@ def get_documents_from_gcs(gcs_project_id, gcs_bucket_name, gcs_bucket_folder, g
blob_name = gcs_bucket_folder+'/'+gcs_blob_filename
else:
blob_name = gcs_blob_filename
#credentials, project_id = google.auth.default()

logging.info(f"GCS project_id : {gcs_project_id}")
#loader = GCSFileLoader(project_name=gcs_project_id, bucket=gcs_bucket_name, blob=blob_name, loader_func=load_pdf)
# pages = loader.load()
# file_name = gcs_blob_filename
#creds= Credentials(access_token)

if access_token is None:
storage_client = storage.Client(project=gcs_project_id)
loader = GCSFileLoader(project_name=gcs_project_id, bucket=gcs_bucket_name, blob=blob_name, loader_func=load_document_content)
pages = loader.load()
if (gcs_blob_filename.split('.')[-1]).lower() != 'pdf':
pages = get_pages_with_page_numbers(pages)
else:
creds= Credentials(access_token)
storage_client = storage.Client(project=gcs_project_id, credentials=creds)
print(f'BLOB Name: {blob_name}')
bucket = storage_client.bucket(gcs_bucket_name)
blob = bucket.blob(blob_name)
content = blob.download_as_bytes()
pdf_file = io.BytesIO(content)
pdf_reader = PdfReader(pdf_file)

# Extract text from all pages
text = ""
for page in pdf_reader.pages:
text += page.extract_text()
pages = [Document(page_content = text)]
return gcs_blob_filename, pages

def upload_file_to_gcs(file_chunk, chunk_number, original_file_name, bucket_name):
storage_client = storage.Client()

file_name = f'{original_file_name}_part_{chunk_number}'
bucket = storage_client.bucket(bucket_name)
file_data = file_chunk.file.read()
# print(f'data after read {file_data}')

blob = bucket.blob(file_name)
file_io = io.BytesIO(file_data)
blob.upload_from_file(file_io)
# Define the lifecycle rule to delete objects after 6 hours
# rule = {
# "action": {"type": "Delete"},
# "condition": {"age": 1} # Age in days (24 hours = 1 days)
# }

# # Get the current lifecycle policy
# lifecycle = list(bucket.lifecycle_rules)

# # Add the new rule
# lifecycle.append(rule)
bucket = storage_client.bucket(gcs_bucket_name)
blob = bucket.blob(blob_name)
if blob.exists():
content = blob.download_as_bytes()
pdf_file = io.BytesIO(content)
pdf_reader = PdfReader(pdf_file)
# Extract text from all pages
text = ""
for page in pdf_reader.pages:
text += page.extract_text()
pages = [Document(page_content = text)]
else:
raise Exception('Blob Not Found')
return gcs_blob_filename, pages

# # Set the lifecycle policy on the bucket
# bucket.lifecycle_rules = lifecycle
# bucket.patch()
time.sleep(1)
logging.info('Chunk uploaded successfully in gcs')

def merge_file_gcs(bucket_name, original_file_name: str):
def upload_file_to_gcs(file_chunk, chunk_number, original_file_name, bucket_name, folder_name_sha1_hashed):
try:
storage_client = storage.Client()
# Retrieve chunks from GCS
blobs = storage_client.list_blobs(bucket_name, prefix=f"{original_file_name}_part_")
chunks = []
for blob in blobs:
chunks.append(blob.download_as_bytes())
blob.delete()

# Merge chunks into a single file
merged_file = b"".join(chunks)
blob = storage_client.bucket(bucket_name).blob(original_file_name)
logging.info('save the merged file from chunks in gcs')
file_io = io.BytesIO(merged_file)

file_name = f'{original_file_name}_part_{chunk_number}'
bucket = storage_client.bucket(bucket_name)
file_data = file_chunk.file.read()
file_name_with__hashed_folder = folder_name_sha1_hashed +'/'+file_name
logging.info(f'GCS folder pathin upload: {file_name_with__hashed_folder}')
blob = bucket.blob(file_name_with__hashed_folder)
file_io = io.BytesIO(file_data)
blob.upload_from_file(file_io)
pdf_reader = PdfReader(file_io)
file_size = len(merged_file)
total_pages = len(pdf_reader.pages)

return total_pages, file_size
time.sleep(1)
logging.info('Chunk uploaded successfully in gcs')
except Exception as e:
raise Exception('Error in while uploading the file chunks on GCS')

def merge_file_gcs(bucket_name, original_file_name: str, folder_name_sha1_hashed, total_chunks):
try:
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
# Retrieve chunks from GCS
# blobs = storage_client.list_blobs(bucket_name, prefix=folder_name_sha1_hashed)
# print(f'before sorted blobs: {blobs}')
chunks = []
for i in range(1,total_chunks+1):
blob_name = folder_name_sha1_hashed + '/' + f"{original_file_name}_part_{i}"
blob = bucket.blob(blob_name)
if blob.exists():
print(f'Blob Name: {blob.name}')
chunks.append(blob.download_as_bytes())
blob.delete()

merged_file = b"".join(chunks)
file_name_with__hashed_folder = folder_name_sha1_hashed +'/'+original_file_name
logging.info(f'GCS folder path in merge: {file_name_with__hashed_folder}')
blob = storage_client.bucket(bucket_name).blob(file_name_with__hashed_folder)
logging.info('save the merged file from chunks in gcs')
file_io = io.BytesIO(merged_file)
blob.upload_from_file(file_io)
# pdf_reader = PdfReader(file_io)
file_size = len(merged_file)
# total_pages = len(pdf_reader.pages)

return file_size
except Exception as e:
raise Exception('Error in while merge the files chunks on GCS')

def delete_file_from_gcs(bucket_name, file_name):
def delete_file_from_gcs(bucket_name,folder_name, file_name):
try:
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_name)
folder_file_name = folder_name +'/'+file_name
blob = bucket.blob(folder_file_name)
if blob.exists():
blob.delete()
logging.info('File deleted from GCS successfully')
except:
raise Exception('BLOB not exists in GCS')
except Exception as e:
raise Exception(e)
Loading