Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more filter type in query for relyt vector database #13512

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import logging
from typing import Any, List
from typing import Any, List, Sequence

from llama_index.core.bridge.pydantic import PrivateAttr
from llama_index.core.schema import BaseNode, MetadataMode
from llama_index.core.vector_stores.types import (
BasePydanticVectorStore,
VectorStoreQuery,
VectorStoreQueryResult,
VectorStoreQueryResult, FilterCondition, MetadataFilters, FilterOperator,
)
from llama_index.core.vector_stores.utils import (
metadata_dict_to_node,
node_to_metadata_dict,
)
from pgvecto_rs.sdk import PGVectoRs, Record
from pgvecto_rs.sdk.filters import meta_contains
from pydantic import StrictStr
from sqlalchemy import text

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -59,9 +59,10 @@ class RelytVectorStore(BasePydanticVectorStore):
_client: "PGVectoRs" = PrivateAttr()
_collection_name: str = PrivateAttr()

def __init__(self, client: "PGVectoRs", collection_name: str) -> None:
def __init__(self, client: "PGVectoRs", collection_name: str, enable_vector_index: bool) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this have a default option? Is there a reason to set this to false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, thanks review again, i will later to add more feature for this , when this pr ready , will invite you to review again

self._client: PGVectoRs = client
self._collection_name = collection_name
self._enable_vector_index = enable_vector_index
self.init_index()
super().__init__()

Expand All @@ -78,25 +79,36 @@ def init_index(self):
SELECT 1
FROM pg_indexes
WHERE indexname = '{index_name}';
"""
)
""")
result = conn.execute(index_query).scalar()
if not result:
if not result and self._enable_vector_index:
index_statement = text(
f"""
CREATE INDEX {index_name}
ON collection_{self._collection_name}
USING vectors (embedding vector_l2_ops)
WITH (options = $$
optimizing.optimizing_threads = 30
optimizing.optimizing_threads = 10
segment.max_growing_segment_size = 2000
segment.max_sealed_segment_size = 30000000
[indexing.hnsw]
m=30
ef_construction=500
$$);
"""
)
""")
conn.execute(index_statement)
index_name = f"meta_{self._collection_name}_embedding"
index_query = text(
f"""
SELECT 1
FROM pg_indexes
WHERE indexname = '{index_name}';
"""
)
result = conn.execute(index_query).scalar()
if not result:
index_statement = text(
f""" CREATE INDEX {index_name} ON collection_{self._collection_name} USING gin (meta); """)
conn.execute(index_statement)

@property
Expand All @@ -120,34 +132,114 @@ def add(
self._client.insert(records)
return [node.id_ for node in nodes]

def delete(self, ref_doc_id: str, **delete_kwargs: Any) -> None:
self._client.delete(meta_contains({"ref_doc_id": ref_doc_id}))
def delete(self, filters: str, **delete_kwargs: Any) -> None:
if filters is None:
raise ValueError("filters cannot be None")

filter_condition = f"WHERE {filters}"

with self._client._engine.connect() as conn:
with conn.begin():
sql_query = f""" DELETE FROM collection_{self._collection_name} {filter_condition}"""
conn.execute(text(sql_query))

def drop(self) -> None:
self._client.drop()

# TODO: the more filter type(le, ne, ge ...) will add later, after the base api supported,
# now only support eq filter for meta information
def to_postgres_operator(self, operator: FilterOperator) -> str:
if operator == FilterOperator.EQ:
return " = "
elif operator == FilterOperator.GT:
return " > "
elif operator == FilterOperator.LT:
return " < "
elif operator == FilterOperator.NE:
return " != "
elif operator == FilterOperator.GTE:
return " >= "
elif operator == FilterOperator.LTE:
return " <= "
elif operator == FilterOperator.IN:
return " in "

def to_postgres_conditions(self, operator: FilterOperator) -> str:
if operator == FilterCondition.AND:
return "AND"
elif operator == FilterCondition.OR:
return "OR"

def transformer_filter(self, filters) -> str:
filter_statement = ""
for filter in filters:
if isinstance(filter, MetadataFilters):
f_stmt = self.transformer_filter(filter)
if filter_statement == "":
filter_statement = f_stmt
else:
filter_statement += filter.condition + f_stmt
else:
key = filter.key
value = filter.value
op = filter.operator
if isinstance(value, StrictStr):
value = "'{}'".format(value)
if op == FilterOperator.IN:
new_val = []
for v in value:
if isinstance(v, StrictStr):
new_val.append("'{}'".format(v))
else:
new_val.append(str(v))
value = "(" + ",".join(new_val) + ")"
filter_cond = key + self.to_postgres_operator(op) + value
if filter_statement == "":
filter_statement = filter_cond
else:
filter_statement += filters.condition + filter_cond
return filter_statement

def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResult:
results = self._client.search(
embedding=query.query_embedding,
top_k=query.similarity_top_k,
filter=(
meta_contains(
{pair.key: pair.value for pair in query.filters.legacy_filters()}
)
if query.filters is not None
else None
),
)
# Add the filter if provided
try:
from sqlalchemy.engine import Row
except ImportError:
raise ImportError(
"Could not import Row from sqlalchemy.engine. "
"Please 'pip install sqlalchemy>=1.4'."
)

embedding = VectorStoreQuery.query_embedding
k = VectorStoreQuery.similarity_top_k
filter_condition = ""
filters = VectorStoreQuery.filters

if filters is not None:
filter_condition += f"WHERE {self.transformer_filter(filters)}"

sql_query = f"""
SELECT id, text, meta, embedding <-> :embedding as distance
FROM {self._collection_name}
{filter_condition}
ORDER BY embedding <-> :embedding
LIMIT :k
"""

# Set up the query parameters
embedding_str = ", ".join(format(x) for x in embedding)
embedding_str = "[" + embedding_str + "]"
params = {"embedding": embedding_str, "k": k}

# Execute the query and fetch the results
with self.engine.connect() as conn:
results: Sequence[Row] = conn.execute(text(sql_query), params).fetchall()

nodes = [
metadata_dict_to_node(record.meta, text=record.text)
for record, _ in results
metadata_dict_to_node(reocrd.meta, text=reocrd.text)
for reocrd in results
]

return VectorStoreQueryResult(
nodes=nodes,
similarities=[score for _, score in results],
ids=[str(record.id) for record, _ in results],
similarities=[r.distance for r in results],
ids=[str(r.id) for r in results],
)
Loading