forked from drskennedy/cass_kg_rag
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcustom_retriever.py
43 lines (34 loc) · 1.53 KB
/
custom_retriever.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from llama_index.core import QueryBundle
# import NodeWithScore
from llama_index.core.schema import NodeWithScore
# Retrievers
from llama_index.core.retrievers import BaseRetriever, VectorIndexRetriever, KGTableRetriever
from typing import List
class CustomRetriever(BaseRetriever):
"""Custom retriever that performs both Vector and Knowledge Graph searches"""
def __init__(
self,
vector_retriever: VectorIndexRetriever,
kg_retriever: KGTableRetriever,
mode: str = "OR"
) -> None:
"""Init properties"""
self._vector_retriever = vector_retriever
self._kg_retriever = kg_retriever
if mode not in ("AND", "OR"):
raise ValueError("Invalid mode.")
self._mode = mode
def _retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
"""Retrieve nodes for given query"""
vector_nodes = self._vector_retriever.retrieve(query_bundle)
kg_nodes = self._kg_retriever.retrieve(query_bundle)
vector_ids = {n.node.node_id for n in vector_nodes}
kg_ids = {n.node.node_id for n in kg_nodes}
combined_dict = {n.node.node_id: n for n in vector_nodes}
combined_dict.update({n.node.node_id: n for n in kg_nodes})
if self._mode == "AND":
retrieve_ids = vector_ids.intersection(kg_ids)
else:
retrieve_ids = vector_ids.union(kg_ids)
retrieve_nodes = [combined_dict[rid] for rid in retrieve_ids]
return retrieve_nodes