Skip to content

DO NOT MERGE - Document, chunk node labels and relation labels updated with underscore #626

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions backend/src/QA_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,18 @@


# RETRIEVAL_QUERY = """
# WITH node, score, apoc.text.join([ (node)-[:HAS_ENTITY]->(e) | head(labels(e)) + ": "+ e.id],", ") as entities
# MATCH (node)-[:PART_OF]->(d:Document)
# WITH node, score, apoc.text.join([ (node)-[:__HAS_ENTITY__]->(e) | head(labels(e)) + ": "+ e.id],", ") as entities
# MATCH (node)-[:__PART_OF__]->(d:Document)
# WITH d, apoc.text.join(collect(node.text + "\n" + entities),"\n----\n") as text, avg(score) as score
# RETURN text, score, {source: COALESCE(CASE WHEN d.url CONTAINS "None" THEN d.fileName ELSE d.url END, d.fileName)} as metadata
# """

RETRIEVAL_QUERY = """
WITH node as chunk, score
MATCH (chunk)-[:PART_OF]->(d:Document)
MATCH (chunk)-[:__PART_OF__]->(d:__Document__)
CALL { WITH chunk
MATCH (chunk)-[:HAS_ENTITY]->(e)
MATCH path=(e)(()-[rels:!HAS_ENTITY&!PART_OF]-()){0,3}(:!Chunk&!Document)
MATCH (chunk)-[:__HAS_ENTITY__]->(e)
MATCH path=(e)(()-[rels:!__HAS_ENTITY__&!__PART_OF__]-()){0,3}(:!__Chunk__&!__Document__)
UNWIND rels as r
RETURN collect(distinct r) as rels
}
Expand Down
6 changes: 3 additions & 3 deletions backend/src/QA_optimization.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ async def _vector_embed_results(self):
t=datetime.now()
print("Vector embeddings start time",t)
# retrieval_query="""
# MATCH (node)-[:PART_OF]->(d:Document)
# MATCH (node)-[:__PART_OF__]->(d:Document)
# WITH d, apoc.text.join(collect(node.text),"\n----\n") as text, avg(score) as score
# RETURN text, score, {source: COALESCE(CASE WHEN d.url CONTAINS "None" THEN d.fileName ELSE d.url END, d.fileName)} as metadata
# """
retrieval_query="""
WITH node, score, apoc.text.join([ (node)-[:HAS_ENTITY]->(e) | head(labels(e)) + ": "+ e.id],", ") as entities
MATCH (node)-[:PART_OF]->(d:Document)
WITH node, score, apoc.text.join([ (node)-[:__HAS_ENTITY__]->(e) | head(labels(e)) + ": "+ e.id],", ") as entities
MATCH (node)-[:__PART_OF__]->(d:__Document__)
WITH d, apoc.text.join(collect(node.text + "\n" + entities),"\n----\n") as text, avg(score) as score
RETURN text, score, {source: COALESCE(CASE WHEN d.url CONTAINS "None" THEN d.fileName ELSE d.url END, d.fileName)} as metadata
"""
Expand Down
8 changes: 4 additions & 4 deletions backend/src/chunkid_entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
from src.graph_query import *

CHUNK_QUERY = """
match (chunk:Chunk) where chunk.id IN $chunksIds
match (chunk:__Chunk__) where chunk.id IN $chunksIds

MATCH (chunk)-[:PART_OF]->(d:Document)
MATCH (chunk)-[:__PART_OF__]->(d:__Document__)
CALL {WITH chunk
MATCH (chunk)-[:HAS_ENTITY]->(e)
MATCH path=(e)(()-[rels:!HAS_ENTITY&!PART_OF]-()){0,2}(:!Chunk&!Document)
MATCH (chunk)-[:__HAS_ENTITY__]->(e)
MATCH path=(e)(()-[rels:!__HAS_ENTITY__&!__PART_OF__]-()){0,2}(:!__Chunk__&!__Document__)
UNWIND rels as r
RETURN collect(distinct r) as rels
}
Expand Down
44 changes: 22 additions & 22 deletions backend/src/graphDB_dataAccess.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def update_exception_db(self, file_name, exp_msg):
is_cancelled_status = result[0]['is_cancelled']
if bool(is_cancelled_status) == True:
job_status = 'Cancelled'
self.graph.query("""MERGE(d:Document {fileName :$fName}) SET d.status = $status, d.errorMessage = $error_msg""",
self.graph.query("""MERGE(d:__Document__ {fileName :$fName}) SET d.status = $status, d.errorMessage = $error_msg""",
{"fName":file_name, "status":job_status, "error_msg":exp_msg})
except Exception as e:
error_message = str(e)
Expand All @@ -31,7 +31,7 @@ def create_source_node(self, obj_source_node:sourceNode):
try:
job_status = "New"
logging.info("creating source node if does not exist")
self.graph.query("""MERGE(d:Document {fileName :$fn}) SET d.fileSize = $fs, d.fileType = $ft ,
self.graph.query("""MERGE(d:__Document__ {fileName :$fn}) SET d.fileSize = $fs, d.fileType = $ft ,
d.status = $st, d.url = $url, d.awsAccessKeyId = $awsacc_key_id,
d.fileSource = $f_source, d.createdAt = $c_at, d.updatedAt = $u_at,
d.processingTime = $pt, d.errorMessage = $e_message, d.nodeCount= $n_count,
Expand Down Expand Up @@ -95,7 +95,7 @@ def update_source_node(self, obj_source_node:sourceNode):
param= {"props":params}

print(f'Base Param value 1 : {param}')
query = "MERGE(d:Document {fileName :$props.fileName}) SET d += $props"
query = "MERGE(d:__Document__ {fileName :$props.fileName}) SET d += $props"
logging.info("Update source node properties")
self.graph.query(query,param)
except Exception as e:
Expand All @@ -117,7 +117,7 @@ def get_source_list(self):
sorting the list by the last updated date.
"""
logging.info("Get existing files list from graph")
query = "MATCH(d:Document) WHERE d.fileName IS NOT NULL RETURN d ORDER BY d.updatedAt DESC"
query = "MATCH(d:__Document__) WHERE d.fileName IS NOT NULL RETURN d ORDER BY d.updatedAt DESC"
result = self.graph.query(query)
list_of_json_objects = [entry['d'] for entry in result]
return list_of_json_objects
Expand All @@ -131,10 +131,10 @@ def update_KNN_graph(self):
knn_min_score = os.environ.get('KNN_MIN_SCORE')
if len(index) > 0:
logging.info('update KNN graph')
self.graph.query("""MATCH (c:Chunk)
WHERE c.embedding IS NOT NULL AND count { (c)-[:SIMILAR]-() } < 5
self.graph.query("""MATCH (c:__Chunk__)
WHERE c.embedding IS NOT NULL AND count { (c)-[:__SIMILAR__]-() } < 5
CALL db.index.vector.queryNodes('vector', 6, c.embedding) yield node, score
WHERE node <> c and score >= $score MERGE (c)-[rel:SIMILAR]-(node) SET rel.score = score
WHERE node <> c and score >= $score MERGE (c)-[rel:__SIMILAR__]-(node) SET rel.score = score
""",
{"score":float(knn_min_score)}
)
Expand Down Expand Up @@ -174,7 +174,7 @@ def execute_query(self, query, param=None):

def get_current_status_document_node(self, file_name):
query = """
MATCH(d:Document {fileName : $file_name}) RETURN d.status AS Status , d.processingTime AS processingTime,
MATCH(d:__Document__ {fileName : $file_name}) RETURN d.status AS Status , d.processingTime AS processingTime,
d.nodeCount AS nodeCount, d.model as model, d.relationshipCount as relationshipCount,
d.total_pages AS total_pages, d.total_chunks AS total_chunks , d.fileSize as fileSize,
d.is_cancelled as is_cancelled, d.processed_chunk as processed_chunk, d.fileSource as fileSource
Expand All @@ -197,23 +197,23 @@ def delete_file_from_graph(self, filenames, source_types, deleteEntities:str, me
logging.info(f'Deleted File Path: {merged_file_path} and Deleted File Name : {file_name}')
delete_uploaded_local_file(merged_file_path,file_name)
query_to_delete_document="""
MATCH (d:Document) where d.fileName in $filename_list and d.fileSource in $source_types_list
MATCH (d:__Document__) where d.fileName in $filename_list and d.fileSource in $source_types_list
with collect(d) as documents
unwind documents as d
optional match (d)<-[:PART_OF]-(c:Chunk)
optional match (d)<-[:__PART_OF__]-(c:__Chunk__)
detach delete c, d
return count(*) as deletedChunks
"""
query_to_delete_document_and_entities="""
MATCH (d:Document) where d.fileName in $filename_list and d.fileSource in $source_types_list
MATCH (d:__Document__) where d.fileName in $filename_list and d.fileSource in $source_types_list
with collect(d) as documents
unwind documents as d
optional match (d)<-[:PART_OF]-(c:Chunk)
optional match (d)<-[:__PART_OF__]-(c:__Chunk__)
// if delete-entities checkbox is set
call { with c, documents
match (c)-[:HAS_ENTITY]->(e)
match (c)-[:__HAS_ENTITY__]->(e)
// belongs to another document
where not exists { (d2)<-[:PART_OF]-()-[:HAS_ENTITY]->(e) WHERE NOT d2 IN documents }
where not exists { (d2)<-[:__PART_OF__]-()-[:__HAS_ENTITY__]->(e) WHERE NOT d2 IN documents }
detach delete e
return count(*) as entities
}
Expand All @@ -231,17 +231,17 @@ def delete_file_from_graph(self, filenames, source_types, deleteEntities:str, me

def list_unconnected_nodes(self):
query = """
MATCH (e:!Chunk&!Document)
WHERE NOT exists { (e)--(:!Chunk&!Document) }
OPTIONAL MATCH (doc:Document)<-[:PART_OF]-(c:Chunk)-[:HAS_ENTITY]->(e)
MATCH (e:!__Chunk__&!__Document__)
WHERE NOT exists { (e)--(:!__Chunk__&!__Document__) }
OPTIONAL MATCH (doc:__Document__)<-[:__PART_OF__]-(c:__Chunk__)-[:__HAS_ENTITY__]->(e)
RETURN e {.*, embedding:null, elementId:elementId(e), labels:labels(e)} as e,
collect(distinct doc.fileName) as documents, count(distinct c) as chunkConnections
ORDER BY e.id ASC
LIMIT 100
"""
query_total_nodes = """
MATCH (e:!Chunk&!Document)
WHERE NOT exists { (e)--(:!Chunk&!Document) }
MATCH (e:!__Chunk__&!__Document__)
WHERE NOT exists { (e)--(:!__Chunk__&!__Document__) }
RETURN count(*) as total
"""
nodes_list = self.execute_query(query)
Expand All @@ -261,7 +261,7 @@ def get_duplicate_nodes_list(self):
score_value = float(os.environ.get('DUPLICATE_SCORE_VALUE'))
text_distance = int(os.environ.get('DUPLICATE_TEXT_DISTANCE'))
query_duplicate_nodes = """
MATCH (n:!Chunk&!Document) with n
MATCH (n:!__Chunk__&!__Document__) with n
WHERE n.embedding is not null and n.id is not null // and size(n.id) > 3
WITH n ORDER BY count {{ (n)--() }} DESC, size(n.id) DESC // updated
WITH collect(n) as nodes
Expand Down Expand Up @@ -289,7 +289,7 @@ def get_duplicate_nodes_list(self):
where none(other in all where other <> nodes and size(other) > size(nodes) and size(apoc.coll.subtract(nodes, other))=0)
return head(nodes) as n, tail(nodes) as similar
}}
OPTIONAL MATCH (doc:Document)<-[:PART_OF]-(c:Chunk)-[:HAS_ENTITY]->(n)
OPTIONAL MATCH (doc:__Document__)<-[:__PART_OF__]-(c:__Chunk__)-[:__HAS_ENTITY__]->(n)
{return_statement}
"""
return_query_duplicate_nodes = """
Expand Down Expand Up @@ -335,7 +335,7 @@ def drop_create_vector_index(self, is_vector_index_recreate):
if is_vector_index_recreate == 'true':
self.graph.query("""drop index vector""")

self.graph.query("""CREATE VECTOR INDEX `vector` if not exists for (c:Chunk) on (c.embedding)
self.graph.query("""CREATE VECTOR INDEX `vector` if not exists for (c:__Chunk__) on (c.embedding)
OPTIONS {indexConfig: {
`vector.dimensions`: $dimensions,
`vector.similarity_function`: 'cosine'
Expand Down
2 changes: 1 addition & 1 deletion backend/src/graph_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def get_completed_documents(driver):
"""
Retrieves the names of all documents with the status 'Completed' from the database.
"""
docs_query = "MATCH(node:Document {status:'Completed'}) RETURN node"
docs_query = "MATCH(node:__Document__ {status:'Completed'}) RETURN node"

try:
logging.info("Executing query to retrieve completed documents.")
Expand Down
6 changes: 4 additions & 2 deletions backend/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,11 +508,13 @@ def get_labels_and_relationtypes(graph):
query = """
RETURN collect {
CALL db.labels() yield label
WHERE NOT label IN ['Chunk','_Bloom_Perspective_']
WHERE NOT label IN ['__Chunk__','_Bloom_Perspective_']
AND NOT label STARTS WITH ('__')
AND NOT label ENDS WITH('__')
return label order by label limit 100 } as labels,
collect {
CALL db.relationshipTypes() yield relationshipType as type
WHERE NOT type IN ['PART_OF', 'NEXT_CHUNK', 'HAS_ENTITY', '_Bloom_Perspective_']
WHERE NOT type IN ['__PART_OF__', '__NEXT_CHUNK__', '__HAS_ENTITY__', '_Bloom_Perspective_']
return type order by type LIMIT 100 } as relationshipTypes
"""
graphDb_data_Access = graphDBdataAccess(graph)
Expand Down
48 changes: 24 additions & 24 deletions backend/src/make_relationships.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

def merge_relationship_between_chunk_and_entites(graph: Neo4jGraph, graph_documents_chunk_chunk_Id : list):
batch_data = []
logging.info("Create HAS_ENTITY relationship between chunks and entities")
logging.info("Create __HAS_ENTITY__ relationship between chunks and entities")
chunk_node_id_set = 'id:"{}"'
for graph_doc_chunk_id in graph_documents_chunk_chunk_Id:
for node in graph_doc_chunk_id['graph_doc'].nodes:
Expand All @@ -24,14 +24,14 @@ def merge_relationship_between_chunk_and_entites(graph: Neo4jGraph, graph_docume
#node_id = node.id
#Below query is also unable to change as parametrize because we can't make parameter of Label or node type
#https://neo4j.com/docs/cypher-manual/current/syntax/parameters/
#graph.query('MATCH(c:Chunk {'+chunk_node_id_set.format(graph_doc_chunk_id['chunk_id'])+'}) MERGE (n:'+ node.type +'{ id: "'+node_id+'"}) MERGE (c)-[:HAS_ENTITY]->(n)')
#graph.query('MATCH(c:Chunk {'+chunk_node_id_set.format(graph_doc_chunk_id['chunk_id'])+'}) MERGE (n:'+ node.type +'{ id: "'+node_id+'"}) MERGE (c)-[:__HAS_ENTITY__]->(n)')

if batch_data:
unwind_query = """
UNWIND $batch_data AS data
MATCH (c:Chunk {id: data.chunk_id})
MATCH (c:__Chunk__ {id: data.chunk_id})
CALL apoc.merge.node([data.node_type], {id: data.node_id}) YIELD node AS n
MERGE (c)-[:HAS_ENTITY]->(n)
MERGE (c)-[:__HAS_ENTITY__]->(n)
"""
graph.query(unwind_query, params={"batch_data": batch_data})

Expand All @@ -55,9 +55,9 @@ def update_embedding_create_vector_index(graph, chunkId_chunkDoc_list, file_name
"chunkId": row['chunk_id'],
"embeddings": embeddings_arr
})
# graph.query("""MATCH (d:Document {fileName : $fileName})
# MERGE (c:Chunk {id:$chunkId}) SET c.embedding = $embeddings
# MERGE (c)-[:PART_OF]->(d)
# graph.query("""MATCH (d:__Document__ {fileName : $fileName})
# MERGE (c:__Chunk__ {id:$chunkId}) SET c.embedding = $embeddings
# MERGE (c)-[:__PART_OF__]->(d)
# """,
# {
# "fileName" : file_name,
Expand All @@ -67,7 +67,7 @@ def update_embedding_create_vector_index(graph, chunkId_chunkDoc_list, file_name
# )
# logging.info('create vector index on chunk embedding')

graph.query("""CREATE VECTOR INDEX `vector` if not exists for (c:Chunk) on (c.embedding)
graph.query("""CREATE VECTOR INDEX `vector` if not exists for (c:__Chunk__) on (c.embedding)
OPTIONS {indexConfig: {
`vector.dimensions`: $dimensions,
`vector.similarity_function`: 'cosine'
Expand All @@ -80,10 +80,10 @@ def update_embedding_create_vector_index(graph, chunkId_chunkDoc_list, file_name

query_to_create_embedding = """
UNWIND $data AS row
MATCH (d:Document {fileName: $fileName})
MERGE (c:Chunk {id: row.chunkId})
MATCH (d:__Document__ {fileName: $fileName})
MERGE (c:__Chunk__ {id: row.chunkId})
SET c.embedding = row.embeddings
MERGE (c)-[:PART_OF]->(d)
MERGE (c)-[:__PART_OF__]->(d)
"""
graph.query(query_to_create_embedding, params={"fileName":file_name, "data":data_for_query})

Expand Down Expand Up @@ -134,44 +134,44 @@ def create_relation_between_chunks(graph, file_name, chunks: List[Document])->li

# create relationships between chunks
if firstChunk:
relationships.append({"type": "FIRST_CHUNK", "chunk_id": current_chunk_id})
relationships.append({"type": "__FIRST_CHUNK__", "chunk_id": current_chunk_id})
else:
relationships.append({
"type": "NEXT_CHUNK",
"type": "__NEXT_CHUNK__",
"previous_chunk_id": previous_chunk_id, # ID of previous chunk
"current_chunk_id": current_chunk_id
})

query_to_create_chunk_and_PART_OF_relation = """
UNWIND $batch_data AS data
MERGE (c:Chunk {id: data.id})
MERGE (c:__Chunk__ {id: data.id})
SET c.text = data.pg_content, c.position = data.position, c.length = data.length, c.fileName=data.f_name, c.content_offset=data.content_offset
WITH data, c
SET c.page_number = CASE WHEN data.page_number IS NOT NULL THEN data.page_number END,
c.start_time = CASE WHEN data.start_time IS NOT NULL THEN data.start_time END,
c.end_time = CASE WHEN data.end_time IS NOT NULL THEN data.end_time END
WITH data, c
MATCH (d:Document {fileName: data.f_name})
MERGE (c)-[:PART_OF]->(d)
MATCH (d:__Document__ {fileName: data.f_name})
MERGE (c)-[:__PART_OF__]->(d)
"""
graph.query(query_to_create_chunk_and_PART_OF_relation, params={"batch_data": batch_data})

query_to_create_FIRST_relation = """
UNWIND $relationships AS relationship
MATCH (d:Document {fileName: $f_name})
MATCH (c:Chunk {id: relationship.chunk_id})
FOREACH(r IN CASE WHEN relationship.type = 'FIRST_CHUNK' THEN [1] ELSE [] END |
MERGE (d)-[:FIRST_CHUNK]->(c))
MATCH (d:__Document__ {fileName: $f_name})
MATCH (c:__Chunk__ {id: relationship.chunk_id})
FOREACH(r IN CASE WHEN relationship.type = '__FIRST_CHUNK__' THEN [1] ELSE [] END |
MERGE (d)-[:__FIRST_CHUNK__]->(c))
"""
graph.query(query_to_create_FIRST_relation, params={"f_name": file_name, "relationships": relationships})

query_to_create_NEXT_CHUNK_relation = """
UNWIND $relationships AS relationship
MATCH (c:Chunk {id: relationship.current_chunk_id})
MATCH (c:__Chunk__ {id: relationship.current_chunk_id})
WITH c, relationship
MATCH (pc:Chunk {id: relationship.previous_chunk_id})
FOREACH(r IN CASE WHEN relationship.type = 'NEXT_CHUNK' THEN [1] ELSE [] END |
MERGE (c)<-[:NEXT_CHUNK]-(pc))
MATCH (pc:__Chunk__ {id: relationship.previous_chunk_id})
FOREACH(r IN CASE WHEN relationship.type = '__NEXT_CHUNK__' THEN [1] ELSE [] END |
MERGE (c)<-[:__NEXT_CHUNK__]-(pc))
"""
graph.query(query_to_create_NEXT_CHUNK_relation, params={"relationships": relationships})

Expand Down
Loading