Skip to content

Staging to Main #1173

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Mar 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ updates:
- package-ecosystem: 'npm'
directory: '/frontend'
schedule:
interval: 'weekly'
interval: 'monthly'
target-branch: 'dev'

- package-ecosystem: 'pip'
directory: '/backend'
schedule:
interval: 'weekly'
target-branch: 'dev'
interval: 'monthly'
target-branch: 'dev'
2 changes: 2 additions & 0 deletions backend/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,5 @@ Secweb==1.11.0
ragas==0.2.11
rouge_score==0.1.2
langchain-neo4j==0.3.0
pypandoc-binary==1.15
chardet==5.2.0
34 changes: 31 additions & 3 deletions backend/score.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,27 @@
CHUNK_DIR = os.path.join(os.path.dirname(__file__), "chunks")
MERGED_DIR = os.path.join(os.path.dirname(__file__), "merged_files")

def sanitize_filename(filename):
"""
Sanitize the user-provided filename to prevent directory traversal and remove unsafe characters.
"""
# Remove path separators and collapse redundant separators
filename = os.path.basename(filename)
filename = os.path.normpath(filename)
return filename

def validate_file_path(directory, filename):
"""
Construct the full file path and ensure it is within the specified directory.
"""
file_path = os.path.join(directory, filename)
abs_directory = os.path.abspath(directory)
abs_file_path = os.path.abspath(file_path)
# Ensure the file path starts with the intended directory path
if not abs_file_path.startswith(abs_directory):
raise ValueError("Invalid file path")
return abs_file_path

def healthy_condition():
output = {"healthy": True}
return output
Expand Down Expand Up @@ -217,8 +238,9 @@ async def extract_knowledge_graph_from_file(
start_time = time.time()
graph = create_graph_database_connection(uri, userName, password, database)
graphDb_data_Access = graphDBdataAccess(graph)
merged_file_path = os.path.join(MERGED_DIR,file_name)
if source_type == 'local file':
file_name = sanitize_filename(file_name)
merged_file_path = validate_file_path(MERGED_DIR, file_name)
uri_latency, result = await extract_graph_from_file_local_file(uri, userName, password, database, model, merged_file_path, file_name, allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, retry_condition, additional_instructions)

elif source_type == 's3 bucket' and source_url:
Expand Down Expand Up @@ -278,8 +300,11 @@ async def extract_knowledge_graph_from_file(
return create_api_response('Success', data=result, file_source= source_type)
except LLMGraphBuilderException as e:
error_message = str(e)
graph = create_graph_database_connection(uri, userName, password, database)
graphDb_data_Access = graphDBdataAccess(graph)
graphDb_data_Access.update_exception_db(file_name,error_message, retry_condition)
failed_file_process(uri,file_name, merged_file_path, source_type)
if source_type == 'local file':
failed_file_process(uri,file_name, merged_file_path)
node_detail = graphDb_data_Access.get_current_status_document_node(file_name)
# Set the status "Completed" in logging becuase we are treating these error already handled by application as like custom errors.
json_obj = {'api_name':'extract','message':error_message,'file_created_at':formatted_time(node_detail[0]['created_time']),'error_message':error_message, 'file_name': file_name,'status':'Completed',
Expand All @@ -290,8 +315,11 @@ async def extract_knowledge_graph_from_file(
except Exception as e:
message=f"Failed To Process File:{file_name} or LLM Unable To Parse Content "
error_message = str(e)
graph = create_graph_database_connection(uri, userName, password, database)
graphDb_data_Access = graphDBdataAccess(graph)
graphDb_data_Access.update_exception_db(file_name,error_message, retry_condition)
failed_file_process(uri,file_name, merged_file_path, source_type)
if source_type == 'local file':
failed_file_process(uri,file_name, merged_file_path)
node_detail = graphDb_data_Access.get_current_status_document_node(file_name)

json_obj = {'api_name':'extract','message':message,'file_created_at':formatted_time(node_detail[0]['created_time']),'error_message':error_message, 'file_name': file_name,'status':'Failed',
Expand Down
21 changes: 16 additions & 5 deletions backend/src/QA_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def get_rag_chain(llm, system_template=CHAT_SYSTEM_TEMPLATE):
logging.error(f"Error creating RAG chain: {e}")
raise

def format_documents(documents, model):
def format_documents(documents, model,chat_mode_settings):
prompt_token_cutoff = 4
for model_names, value in CHAT_TOKEN_CUT_OFF.items():
if model in model_names:
Expand All @@ -197,9 +197,20 @@ def format_documents(documents, model):
try:
source = doc.metadata.get('source', "unknown")
sources.add(source)

entities = doc.metadata['entities'] if 'entities'in doc.metadata.keys() else entities
global_communities = doc.metadata["communitydetails"] if 'communitydetails'in doc.metadata.keys() else global_communities
if 'entities' in doc.metadata:
if chat_mode_settings["mode"] == CHAT_ENTITY_VECTOR_MODE:
entity_ids = [entry['entityids'] for entry in doc.metadata['entities'] if 'entityids' in entry]
entities.setdefault('entityids', set()).update(entity_ids)
else:
if 'entityids' in doc.metadata['entities']:
entities.setdefault('entityids', set()).update(doc.metadata['entities']['entityids'])
if 'relationshipids' in doc.metadata['entities']:
entities.setdefault('relationshipids', set()).update(doc.metadata['entities']['relationshipids'])

if 'communitydetails' in doc.metadata:
existing_ids = {entry['id'] for entry in global_communities}
new_entries = [entry for entry in doc.metadata["communitydetails"] if entry['id'] not in existing_ids]
global_communities.extend(new_entries)

formatted_doc = (
"Document start\n"
Expand All @@ -218,7 +229,7 @@ def process_documents(docs, question, messages, llm, model,chat_mode_settings):
start_time = time.time()

try:
formatted_docs, sources, entitydetails, communities = format_documents(docs, model)
formatted_docs, sources, entitydetails, communities = format_documents(docs, model,chat_mode_settings)

rag_chain = get_rag_chain(llm=llm)

Expand Down
2 changes: 1 addition & 1 deletion backend/src/chunkid_entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def get_entities_from_chunkids(uri, username, password, database ,nodedetails,en
elif mode == CHAT_ENTITY_VECTOR_MODE:

if "entitydetails" in nodedetails and nodedetails["entitydetails"]:
entity_ids = [item["id"] for item in nodedetails["entitydetails"]]
entity_ids = [item for item in nodedetails["entitydetails"]["entityids"]]
logging.info(f"chunkid_entities module: Starting for entity ids: {entity_ids}")
result = process_entityids(driver, entity_ids)
if "chunk_data" in result.keys():
Expand Down
10 changes: 5 additions & 5 deletions backend/src/document_sources/gcs_bucket.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import os
import logging
from google.cloud import storage
from langchain_community.document_loaders import GCSFileLoader, GCSDirectoryLoader
from langchain_community.document_loaders import PyMuPDFLoader
from langchain_community.document_loaders import GCSFileLoader
from langchain_core.documents import Document
from PyPDF2 import PdfReader
import io
Expand Down Expand Up @@ -42,8 +41,9 @@ def get_gcs_bucket_files_info(gcs_project_id, gcs_bucket_name, gcs_bucket_folder
logging.exception(f'Exception Stack trace: {error_message}')
raise LLMGraphBuilderException(error_message)

def load_pdf(file_path):
return PyMuPDFLoader(file_path)
def gcs_loader_func(file_path):
loader, _ = load_document_content(file_path)
return loader

def get_documents_from_gcs(gcs_project_id, gcs_bucket_name, gcs_bucket_folder, gcs_blob_filename, access_token=None):
nltk.download('punkt')
Expand All @@ -64,7 +64,7 @@ def get_documents_from_gcs(gcs_project_id, gcs_bucket_name, gcs_bucket_folder, g
blob = bucket.blob(blob_name)

if blob.exists():
loader = GCSFileLoader(project_name=gcs_project_id, bucket=gcs_bucket_name, blob=blob_name, loader_func=load_document_content)
loader = GCSFileLoader(project_name=gcs_project_id, bucket=gcs_bucket_name, blob=blob_name, loader_func=gcs_loader_func)
pages = loader.load()
else :
raise LLMGraphBuilderException('File does not exist, Please re-upload the file and try again.')
Expand Down
63 changes: 47 additions & 16 deletions backend/src/document_sources/local_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,61 @@
from langchain_community.document_loaders import PyMuPDFLoader
from langchain_community.document_loaders import UnstructuredFileLoader
from langchain_core.documents import Document
import chardet
from langchain_core.document_loaders import BaseLoader

class ListLoader(BaseLoader):
"""A wrapper to make a list of Documents compatible with BaseLoader."""
def __init__(self, documents):
self.documents = documents
def load(self):
return self.documents

def detect_encoding(file_path):
"""Detects the file encoding to avoid UnicodeDecodeError."""
with open(file_path, 'rb') as f:
raw_data = f.read(4096)
result = chardet.detect(raw_data)
return result['encoding'] or "utf-8"

def load_document_content(file_path):
if Path(file_path).suffix.lower() == '.pdf':
return PyMuPDFLoader(file_path)
file_extension = Path(file_path).suffix.lower()
encoding_flag = False
if file_extension == '.pdf':
loader = PyMuPDFLoader(file_path)
return loader,encoding_flag
elif file_extension == ".txt":
encoding = detect_encoding(file_path)
logging.info(f"Detected encoding for {file_path}: {encoding}")
if encoding.lower() == "utf-8":
loader = UnstructuredFileLoader(file_path, mode="elements",autodetect_encoding=True)
return loader,encoding_flag
else:
with open(file_path, encoding=encoding, errors="replace") as f:
content = f.read()
loader = ListLoader([Document(page_content=content, metadata={"source": file_path})])
encoding_flag = True
return loader,encoding_flag
else:
return UnstructuredFileLoader(file_path, mode="elements",autodetect_encoding=True)
loader = UnstructuredFileLoader(file_path, mode="elements",autodetect_encoding=True)
return loader,encoding_flag

def get_documents_from_file_by_path(file_path,file_name):
file_path = Path(file_path)
if file_path.exists():
logging.info(f'file {file_name} processing')
file_extension = file_path.suffix.lower()
try:
loader = load_document_content(file_path)
if file_extension == ".pdf":
pages = loader.load()
else:
unstructured_pages = loader.load()
pages= get_pages_with_page_numbers(unstructured_pages)
except Exception as e:
raise Exception('Error while reading the file content or metadata')
else:
if not file_path.exists():
logging.info(f'File {file_name} does not exist')
raise Exception(f'File {file_name} does not exist')
logging.info(f'file {file_name} processing')
try:
loader, encoding_flag = load_document_content(file_path)
file_extension = file_path.suffix.lower()
if file_extension == ".pdf" or (file_extension == ".txt" and encoding_flag):
pages = loader.load()
else:
unstructured_pages = loader.load()
pages = get_pages_with_page_numbers(unstructured_pages)
except Exception as e:
raise Exception(f'Error while reading the file content or metadata, {e}')
return file_name, pages , file_extension

def get_pages_with_page_numbers(unstructured_pages):
Expand Down
8 changes: 1 addition & 7 deletions backend/src/document_sources/web_pages.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,6 @@
def get_documents_from_web_page(source_url:str):
try:
pages = WebBaseLoader(source_url, verify_ssl=False).load()
try:
file_name = pages[0].metadata['title'].strip()
if not file_name:
file_name = last_url_segment(source_url)
except:
file_name = last_url_segment(source_url)
return file_name, pages
return pages
except Exception as e:
raise LLMGraphBuilderException(str(e))
2 changes: 1 addition & 1 deletion backend/src/document_sources/wikipedia.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

def get_documents_from_Wikipedia(wiki_query:str, language:str):
try:
pages = WikipediaLoader(query=wiki_query.strip(), lang=language, load_all_available_meta=False).load()
pages = WikipediaLoader(query=wiki_query.strip(), lang=language, load_all_available_meta=False,doc_content_chars_max=100000,load_max_docs=1).load()
file_name = wiki_query.strip()
logging.info(f"Total Pages from Wikipedia = {len(pages)}")
return file_name, pages
Expand Down
14 changes: 10 additions & 4 deletions backend/src/graphDB_dataAccess.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,7 @@ def check_account_access(self, database):
def check_gds_version(self):
try:
gds_procedure_count = """
SHOW PROCEDURES
YIELD name
WHERE name STARTS WITH "gds."
RETURN COUNT(*) AS totalGdsProcedures
SHOW FUNCTIONS YIELD name WHERE name STARTS WITH 'gds.version' RETURN COUNT(*) AS totalGdsProcedures
"""
result = self.graph.query(gds_procedure_count)
total_gds_procedures = result[0]['totalGdsProcedures'] if result else 0
Expand Down Expand Up @@ -564,3 +561,12 @@ def get_nodelabels_relationships(self):
except Exception as e:
print(f"Error in getting node labels/relationship types from db: {e}")
return []

def get_websource_url(self,file_name):
logging.info("Checking if same title with different URL exist in db ")
query = """
MATCH(d:Document {fileName : $file_name}) WHERE d.fileSource = "web-url"
RETURN d.url AS url
"""
param = {"file_name" : file_name}
return self.execute_query(query, param)
28 changes: 26 additions & 2 deletions backend/src/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import boto3
import google.auth
from src.shared.constants import ADDITIONAL_INSTRUCTIONS
import re

def get_llm(model: str):
"""Retrieve the specified language model based on the model name."""
Expand Down Expand Up @@ -166,7 +167,8 @@ def get_chunk_id_as_doc_metadata(chunkId_chunkDoc_list):
async def get_graph_document_list(
llm, combined_chunk_document_list, allowedNodes, allowedRelationship, additional_instructions=None
):
futures = []
if additional_instructions:
additional_instructions = sanitize_additional_instruction(additional_instructions)
graph_document_list = []
if "diffbot_api_key" in dir(llm):
llm_transformer = llm
Expand Down Expand Up @@ -210,4 +212,26 @@ async def get_graph_from_llm(model, chunkId_chunkDoc_list, allowedNodes, allowed
graph_document_list = await get_graph_document_list(
llm, combined_chunk_document_list, allowedNodes, allowedRelationship, additional_instructions
)
return graph_document_list
return graph_document_list

def sanitize_additional_instruction(instruction: str) -> str:
"""
Sanitizes additional instruction by:
- Replacing curly braces `{}` with `[]` to prevent variable interpretation.
- Removing potential injection patterns like `os.getenv()`, `eval()`, `exec()`.
- Stripping problematic special characters.
- Normalizing whitespace.
Args:
instruction (str): Raw additional instruction input.
Returns:
str: Sanitized instruction safe for LLM processing.
"""
logging.info("Sanitizing additional instructions")
instruction = instruction.replace("{", "[").replace("}", "]") # Convert `{}` to `[]` for safety
# Step 2: Block dangerous function calls
injection_patterns = [r"os\.getenv\(", r"eval\(", r"exec\(", r"subprocess\.", r"import os", r"import subprocess"]
for pattern in injection_patterns:
instruction = re.sub(pattern, "[BLOCKED]", instruction, flags=re.IGNORECASE)
# Step 4: Normalize spaces
instruction = re.sub(r'\s+', ' ', instruction).strip()
return instruction
28 changes: 16 additions & 12 deletions backend/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,12 @@ def create_source_node_graph_web_url(graph, model, source_url, source_type):
raise LLMGraphBuilderException(message)
try:
title = pages[0].metadata['title'].strip()
if not title:
if title:
graphDb_data_Access = graphDBdataAccess(graph)
existing_url = graphDb_data_Access.get_websource_url(title)
if existing_url != source_url:
title = str(title) + "-" + str(last_url_segment(source_url)).strip()
else:
title = last_url_segment(source_url)
language = pages[0].metadata['language']
except:
Expand Down Expand Up @@ -253,7 +258,7 @@ async def extract_graph_from_file_s3(uri, userName, password, database, model, s

async def extract_graph_from_web_page(uri, userName, password, database, model, source_url, file_name, allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, retry_condition, additional_instructions):
if not retry_condition:
file_name, pages = get_documents_from_web_page(source_url)
pages = get_documents_from_web_page(source_url)
if pages==None or len(pages)==0:
raise LLMGraphBuilderException(f'Content is not available for given URL : {file_name}')
return await processing_source(uri, userName, password, database, model, file_name, pages, allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, additional_instructions=additional_instructions)
Expand Down Expand Up @@ -742,14 +747,13 @@ def set_status_retry(graph, file_name, retry_condition):
logging.info(obj_source_node)
graphDb_data_Access.update_source_node(obj_source_node)

def failed_file_process(uri,file_name, merged_file_path, source_type):
def failed_file_process(uri,file_name, merged_file_path):
gcs_file_cache = os.environ.get('GCS_FILE_CACHE')
if source_type == 'local file':
if gcs_file_cache == 'True':
folder_name = create_gcs_bucket_folder_name_hashed(uri,file_name)
copy_failed_file(BUCKET_UPLOAD, BUCKET_FAILED_FILE, folder_name, file_name)
time.sleep(5)
delete_file_from_gcs(BUCKET_UPLOAD,folder_name,file_name)
else:
logging.info(f'Deleted File Path: {merged_file_path} and Deleted File Name : {file_name}')
delete_uploaded_local_file(merged_file_path,file_name)
if gcs_file_cache == 'True':
folder_name = create_gcs_bucket_folder_name_hashed(uri,file_name)
copy_failed_file(BUCKET_UPLOAD, BUCKET_FAILED_FILE, folder_name, file_name)
time.sleep(5)
delete_file_from_gcs(BUCKET_UPLOAD,folder_name,file_name)
else:
logging.info(f'Deleted File Path: {merged_file_path} and Deleted File Name : {file_name}')
delete_uploaded_local_file(merged_file_path,file_name)
Loading