Skip to content

Commit

Permalink
Add indexed properties to neo4j enhanced schema (#21335)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasonjo authored May 6, 2024
1 parent a6cdf65 commit ac14f17
Showing 1 changed file with 176 additions and 124 deletions.
300 changes: 176 additions & 124 deletions libs/community/langchain_community/graphs/neo4j_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,123 +142,6 @@ def _get_rel_import_query(baseEntityLabel: bool) -> str:
)


def _enhanced_schema_cypher(
label_or_type: str,
properties: List[Dict[str, Any]],
exhaustive: bool,
is_relationship: bool = False,
) -> str:
if is_relationship:
match_clause = f"MATCH ()-[n:{label_or_type}]->()"
else:
match_clause = f"MATCH (n:{label_or_type})"

with_clauses = []
return_clauses = []
output_dict = {}
if exhaustive:
for prop in properties:
prop_name = prop["property"]
prop_type = prop["type"]
if prop_type == "STRING":
with_clauses.append(
(
f"collect(distinct substring(n.`{prop_name}`, 0, 50)) "
f"AS `{prop_name}_values`"
)
)
return_clauses.append(
(
f"values:`{prop_name}_values`[..{DISTINCT_VALUE_LIMIT}],"
f" distinct_count: size(`{prop_name}_values`)"
)
)
elif prop_type in [
"INTEGER",
"FLOAT",
"DATE",
"DATE_TIME",
"LOCAL_DATE_TIME",
]:
with_clauses.append(f"min(n.`{prop_name}`) AS `{prop_name}_min`")
with_clauses.append(f"max(n.`{prop_name}`) AS `{prop_name}_max`")
with_clauses.append(
f"count(distinct n.`{prop_name}`) AS `{prop_name}_distinct`"
)
return_clauses.append(
(
f"min: toString(`{prop_name}_min`), "
f"max: toString(`{prop_name}_max`), "
f"distinct_count: `{prop_name}_distinct`"
)
)
elif prop_type == "LIST":
with_clauses.append(
(
f"min(size(n.`{prop_name}`)) AS `{prop_name}_size_min`, "
f"max(size(n.`{prop_name}`)) AS `{prop_name}_size_max`"
)
)
return_clauses.append(
f"min_size: `{prop_name}_size_min`, "
f"max_size: `{prop_name}_size_max`"
)
elif prop_type in ["BOOLEAN", "POINT", "DURATION"]:
continue
output_dict[prop_name] = "{" + return_clauses.pop() + "}"
else:
# Just sample 5 random nodes
match_clause += " WITH n LIMIT 5"
for prop in properties:
prop_name = prop["property"]
prop_type = prop["type"]
if prop_type == "STRING":
with_clauses.append(
(
f"collect(distinct substring(n.`{prop_name}`, 0, 50)) "
f"AS `{prop_name}_values`"
)
)
return_clauses.append(f"values: `{prop_name}_values`")
elif prop_type in [
"INTEGER",
"FLOAT",
"DATE",
"DATE_TIME",
"LOCAL_DATE_TIME",
]:
with_clauses.append(
f"collect(distinct toString(n.`{prop_name}`)) "
f"AS `{prop_name}_values`"
)
return_clauses.append(f"values: `{prop_name}_values`")
elif prop_type == "LIST":
with_clauses.append(
(
f"min(size(n.`{prop_name}`)) AS `{prop_name}_size_min`, "
f"max(size(n.`{prop_name}`)) AS `{prop_name}_size_max`"
)
)
return_clauses.append(
f"min_size: `{prop_name}_size_min`,max_size: `{prop_name}_size_max`"
)
elif prop_type in ["BOOLEAN", "POINT", "DURATION"]:
continue

output_dict[prop_name] = "{" + return_clauses.pop() + "}"

with_clause = "WITH " + ",\n ".join(with_clauses)
return_clause = (
"RETURN {"
+ ", ".join(f"`{k}`: {v}" for k, v in output_dict.items())
+ "} AS output"
)

# Combine all parts of the Cypher query
cypher_query = "\n".join([match_clause, with_clause, return_clause])
return cypher_query


def _format_schema(schema: Dict, is_enhanced: bool) -> str:
formatted_node_props = []
formatted_rel_props = []
Expand Down Expand Up @@ -296,17 +179,19 @@ def _format_schema(schema: Dict, is_enhanced: bool) -> str:
example = f'Min: {prop["min"]}, Max: {prop["max"]}'
else:
example = (
f'Example: "{prop["values"][0]}"' if prop["values"] else ""
f'Example: "{prop["values"][0]}"'
if prop.get("values")
else ""
)
elif prop["type"] == "LIST":
# Skip embeddings
if prop["min_size"] > LIST_LIMIT:
if not prop.get("min_size") or prop["min_size"] > LIST_LIMIT:
continue
example = (
f'Min Size: {prop["min_size"]}, Max Size: {prop["max_size"]}'
)
formatted_node_props.append(
f" - `{prop['property']}`: {prop['type']}` {example}"
f" - `{prop['property']}`: {prop['type']} {example}"
)

# Enhanced formatting for relationships
Expand Down Expand Up @@ -541,7 +426,11 @@ def refresh_schema(self) -> None:
# Get constraints & indexes
try:
constraint = self.query("SHOW CONSTRAINTS")
index = self.query("SHOW INDEXES YIELD *")
index = self.query(
"CALL apoc.schema.nodes() YIELD label, properties, type, size, "
"valuesSelectivity WHERE type = 'RANGE' RETURN *, "
"size * valuesSelectivity as distinctValues"
)
except (
ClientError
): # Read-only user might not have access to schema information
Expand All @@ -554,7 +443,6 @@ def refresh_schema(self) -> None:
"relationships": relationships,
"metadata": {"constraint": constraint, "index": index},
}

if self._enhanced_schema:
schema_counts = self.query(
"CALL apoc.meta.graphSample() YIELD nodes, relationships "
Expand All @@ -570,7 +458,7 @@ def refresh_schema(self) -> None:
node_props = self.structured_schema["node_props"].get(node["name"])
if not node_props: # The node has no properties
continue
enhanced_cypher = _enhanced_schema_cypher(
enhanced_cypher = self._enhanced_schema_cypher(
node["name"], node_props, node["count"] < EXHAUSTIVE_SEARCH_LIMIT
)
enhanced_info = self.query(enhanced_cypher)[0]["output"]
Expand All @@ -585,7 +473,7 @@ def refresh_schema(self) -> None:
rel_props = self.structured_schema["rel_props"].get(rel["name"])
if not rel_props: # The rel has no properties
continue
enhanced_cypher = _enhanced_schema_cypher(
enhanced_cypher = self._enhanced_schema_cypher(
rel["name"],
rel_props,
rel["count"] < EXHAUSTIVE_SEARCH_LIMIT,
Expand Down Expand Up @@ -676,3 +564,167 @@ def add_graph_documents(
]
},
)

def _enhanced_schema_cypher(
self,
label_or_type: str,
properties: List[Dict[str, Any]],
exhaustive: bool,
is_relationship: bool = False,
) -> str:
if is_relationship:
match_clause = f"MATCH ()-[n:{label_or_type}]->()"
else:
match_clause = f"MATCH (n:{label_or_type})"

with_clauses = []
return_clauses = []
output_dict = {}
if exhaustive:
for prop in properties:
prop_name = prop["property"]
prop_type = prop["type"]
if prop_type == "STRING":
with_clauses.append(
(
f"collect(distinct substring(n.`{prop_name}`, 0, 50)) "
f"AS `{prop_name}_values`"
)
)
return_clauses.append(
(
f"values:`{prop_name}_values`[..{DISTINCT_VALUE_LIMIT}],"
f" distinct_count: size(`{prop_name}_values`)"
)
)
elif prop_type in [
"INTEGER",
"FLOAT",
"DATE",
"DATE_TIME",
"LOCAL_DATE_TIME",
]:
with_clauses.append(f"min(n.`{prop_name}`) AS `{prop_name}_min`")
with_clauses.append(f"max(n.`{prop_name}`) AS `{prop_name}_max`")
with_clauses.append(
f"count(distinct n.`{prop_name}`) AS `{prop_name}_distinct`"
)
return_clauses.append(
(
f"min: toString(`{prop_name}_min`), "
f"max: toString(`{prop_name}_max`), "
f"distinct_count: `{prop_name}_distinct`"
)
)
elif prop_type == "LIST":
with_clauses.append(
(
f"min(size(n.`{prop_name}`)) AS `{prop_name}_size_min`, "
f"max(size(n.`{prop_name}`)) AS `{prop_name}_size_max`"
)
)
return_clauses.append(
f"min_size: `{prop_name}_size_min`, "
f"max_size: `{prop_name}_size_max`"
)
elif prop_type in ["BOOLEAN", "POINT", "DURATION"]:
continue
output_dict[prop_name] = "{" + return_clauses.pop() + "}"
else:
# Just sample 5 random nodes
match_clause += " WITH n LIMIT 5"
for prop in properties:
prop_name = prop["property"]
prop_type = prop["type"]

# Check if indexed property, we can still do exhaustive
prop_index = [
el
for el in self.structured_schema["metadata"]["index"]
if el["label"] == label_or_type
and el["properties"] == [prop_name]
and el["type"] == "RANGE"
]
if prop_type == "STRING":
if (
prop_index
and prop_index[0].get("size") > 0
and prop_index[0].get("distinctValues") <= DISTINCT_VALUE_LIMIT
):
distinct_values = self.query(
f"CALL apoc.schema.properties.distinct("
f"'{label_or_type}', '{prop_name}') YIELD value"
)[0]["value"]
return_clauses.append(
(
f"values: {distinct_values},"
f" distinct_count: {len(distinct_values)}"
)
)
else:
with_clauses.append(
(
f"collect(distinct substring(n.`{prop_name}`, 0, 50)) "
f"AS `{prop_name}_values`"
)
)
return_clauses.append(f"values: `{prop_name}_values`")
elif prop_type in [
"INTEGER",
"FLOAT",
"DATE",
"DATE_TIME",
"LOCAL_DATE_TIME",
]:
if not prop_index:
with_clauses.append(
f"collect(distinct toString(n.`{prop_name}`)) "
f"AS `{prop_name}_values`"
)
return_clauses.append(f"values: `{prop_name}_values`")
else:
with_clauses.append(
f"min(n.`{prop_name}`) AS `{prop_name}_min`"
)
with_clauses.append(
f"max(n.`{prop_name}`) AS `{prop_name}_max`"
)
with_clauses.append(
f"count(distinct n.`{prop_name}`) AS `{prop_name}_distinct`"
)
return_clauses.append(
(
f"min: toString(`{prop_name}_min`), "
f"max: toString(`{prop_name}_max`), "
f"distinct_count: `{prop_name}_distinct`"
)
)

elif prop_type == "LIST":
with_clauses.append(
(
f"min(size(n.`{prop_name}`)) AS `{prop_name}_size_min`, "
f"max(size(n.`{prop_name}`)) AS `{prop_name}_size_max`"
)
)
return_clauses.append(
(
f"min_size: `{prop_name}_size_min`, "
f"max_size: `{prop_name}_size_max`"
)
)
elif prop_type in ["BOOLEAN", "POINT", "DURATION"]:
continue

output_dict[prop_name] = "{" + return_clauses.pop() + "}"

with_clause = "WITH " + ",\n ".join(with_clauses)
return_clause = (
"RETURN {"
+ ", ".join(f"`{k}`: {v}" for k, v in output_dict.items())
+ "} AS output"
)

# Combine all parts of the Cypher query
cypher_query = "\n".join([match_clause, with_clause, return_clause])
return cypher_query

0 comments on commit ac14f17

Please sign in to comment.