Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds option to construct PGVectorStore with a HNSW index #15024

Merged
merged 5 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 67 additions & 47 deletions docs/docs/examples/vector_stores/postgres.ipynb

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
import re
from typing import Any, List, NamedTuple, Optional, Type, Union
from typing import Any, Dict, List, NamedTuple, Optional, Type, Union

import asyncpg # noqa
import pgvector # noqa
Expand Down Expand Up @@ -148,6 +148,8 @@ class PGVectorStore(BasePydanticVectorStore):
debug: bool
use_jsonb: bool

hnsw_kwargs: Optional[Dict[str, Any]]

_base: Any = PrivateAttr()
_table_class: Any = PrivateAttr()
_engine: Any = PrivateAttr()
Expand All @@ -169,7 +171,26 @@ def __init__(
perform_setup: bool = True,
debug: bool = False,
use_jsonb: bool = False,
hnsw_kwargs: Optional[Dict[str, Any]] = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we include this in the docstring? Even some small explanation of the typical parameters 🤔

It might also be good to update docs/docs/examples/vector_stores/postgres.ipynb to use this by default (imo most people should probably be using hnsw if they want the best performance)

) -> None:
"""Constructor.

Args:
connection_string (Union[str, sqlalchemy.engine.URL]): Connection string to postgres db.
async_connection_string (Union[str, sqlalchemy.engine.URL]): Connection string to async pg db.
table_name (str): Table name.
schema_name (str): Schema name.
hybrid_search (bool, optional): Enable hybrid search. Defaults to False.
text_search_config (str, optional): Text search configuration. Defaults to "english".
embed_dim (int, optional): Embedding dimensions. Defaults to 1536.
cache_ok (bool, optional): Enable cache. Defaults to False.
perform_setup (bool, optional): If db should be set up. Defaults to True.
debug (bool, optional): Debug mode. Defaults to False.
use_jsonb (bool, optional): Use JSONB instead of JSON. Defaults to False.
hnsw_kwargs (Optional[Dict[str, Any]], optional): HNSW kwargs, a dict that
contains "hnsw_ef_construction", "hnsw_ef_search", "hnsw_m", and optionally "hnsw_dist_method". Defaults to None,
which turns off HNSW search.
"""
table_name = table_name.lower()
schema_name = schema_name.lower()

Expand Down Expand Up @@ -206,6 +227,7 @@ def __init__(
perform_setup=perform_setup,
debug=debug,
use_jsonb=use_jsonb,
hnsw_kwargs=hnsw_kwargs,
)

async def close(self) -> None:
Expand Down Expand Up @@ -240,8 +262,34 @@ def from_params(
perform_setup: bool = True,
debug: bool = False,
use_jsonb: bool = False,
hnsw_kwargs: Optional[Dict[str, Any]] = None,
) -> "PGVectorStore":
"""Return connection string from database parameters."""
"""Construct from params.

Args:
host (Optional[str], optional): Host of postgres connection. Defaults to None.
port (Optional[str], optional): Port of postgres connection. Defaults to None.
database (Optional[str], optional): Postgres DB name. Defaults to None.
user (Optional[str], optional): Postgres username. Defaults to None.
password (Optional[str], optional): Postgres password. Defaults to None.
table_name (str): Table name. Defaults to "llamaindex".
schema_name (str): Schema name. Defaults to "public".
connection_string (Union[str, sqlalchemy.engine.URL]): Connection string to postgres db
async_connection_string (Union[str, sqlalchemy.engine.URL]): Connection string to async pg db
hybrid_search (bool, optional): Enable hybrid search. Defaults to False.
text_search_config (str, optional): Text search configuration. Defaults to "english".
embed_dim (int, optional): Embedding dimensions. Defaults to 1536.
cache_ok (bool, optional): Enable cache. Defaults to False.
perform_setup (bool, optional): If db should be set up. Defaults to True.
debug (bool, optional): Debug mode. Defaults to False.
use_jsonb (bool, optional): Use JSONB instead of JSON. Defaults to False.
hnsw_kwargs (Optional[Dict[str, Any]], optional): HNSW kwargs, a dict that
contains "hnsw_ef_construction", "hnsw_ef_search", "hnsw_m", and optionally "hnsw_dist_method". Defaults to None,
which turns off HNSW search.

Returns:
PGVectorStore: Instance of PGVectorStore constructed from params.
"""
conn_str = (
connection_string
or f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}"
Expand All @@ -261,6 +309,7 @@ def from_params(
perform_setup=perform_setup,
debug=debug,
use_jsonb=use_jsonb,
hnsw_kwargs=hnsw_kwargs,
)

@property
Expand Down Expand Up @@ -313,13 +362,37 @@ def _create_extension(self) -> None:
session.execute(statement)
session.commit()

def _create_hnsw_index(self) -> None:
import sqlalchemy

if (
"hnsw_ef_construction" not in self.hnsw_kwargs
or "hnsw_m" not in self.hnsw_kwargs
):
raise ValueError(
"Make sure hnsw_ef_search, hnsw_ef_construction, and hnsw_m are in hnsw_kwargs."
)

hnsw_ef_construction = self.hnsw_kwargs.pop("hnsw_ef_construction")
hnsw_m = self.hnsw_kwargs.pop("hnsw_m")
hnsw_dist_method = self.hnsw_kwargs.pop("hnsw_dist_method", "vector_cosine_ops")

with self._session() as session, session.begin():
statement = sqlalchemy.text(
f"CREATE INDEX ON {self.schema_name}.{self._table_class.__tablename__} USING hnsw (embedding {hnsw_dist_method}) WITH (m = {hnsw_m}, ef_construction = {hnsw_ef_construction})"
)
session.execute(statement)
session.commit()

def _initialize(self) -> None:
if not self._is_initialized:
self._connect()
if self.perform_setup:
self._create_extension()
self._create_schema_if_not_exists()
self._create_tables_if_not_exists()
if self.hnsw_kwargs is not None:
self._create_hnsw_index()
self._is_initialized = True

def _node_to_table_row(self, node: BaseNode) -> Any:
Expand Down Expand Up @@ -493,8 +566,10 @@ def _query_with_score(
text(f"SET ivfflat.probes = :ivfflat_probes"),
{"ivfflat_probes": ivfflat_probes},
)
if kwargs.get("hnsw_ef_search"):
hnsw_ef_search = kwargs.get("hnsw_ef_search")
if self.hnsw_kwargs:
hnsw_ef_search = (
kwargs.get("hnsw_ef_search") or self.hnsw_kwargs["hnsw_ef_search"]
)
session.execute(
text(f"SET hnsw.ef_search = :hnsw_ef_search"),
{"hnsw_ef_search": hnsw_ef_search},
Expand Down Expand Up @@ -524,11 +599,12 @@ async def _aquery_with_score(
async with self._async_session() as async_session, async_session.begin():
from sqlalchemy import text

if kwargs.get("hnsw_ef_search"):
hnsw_ef_search = kwargs.get("hnsw_ef_search")
if self.hnsw_kwargs:
hnsw_ef_search = (
kwargs.get("hnsw_ef_search") or self.hnsw_kwargs["hnsw_ef_search"]
)
await async_session.execute(
text(f"SET hnsw.ef_search = :hnsw_ef_search"),
{"hnsw_ef_search": hnsw_ef_search},
text(f"SET hnsw.ef_search = {hnsw_ef_search}"),
)
if kwargs.get("ivfflat_probes"):
ivfflat_probes = kwargs.get("ivfflat_probes")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ exclude = ["**/BUILD"]
license = "MIT"
name = "llama-index-vector-stores-postgres"
readme = "README.md"
version = "0.1.12"
version = "0.1.13"

[tool.poetry.dependencies]
python = ">=3.8.1,<4.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"port": 5432,
}
TEST_DB = "test_vector_db"
TEST_DB_HNSW = "test_vector_db_hnsw"
TEST_TABLE_NAME = "lorem_ipsum"
TEST_SCHEMA_NAME = "test"
TEST_EMBED_DIM = 2
Expand Down Expand Up @@ -78,6 +79,20 @@ def db(conn: Any) -> Generator:
conn.commit()


@pytest.fixture()
def db_hnsw(conn: Any) -> Generator:
conn.autocommit = True

with conn.cursor() as c:
c.execute(f"DROP DATABASE IF EXISTS {TEST_DB_HNSW}")
c.execute(f"CREATE DATABASE {TEST_DB_HNSW}")
conn.commit()
yield
with conn.cursor() as c:
c.execute(f"DROP DATABASE {TEST_DB_HNSW}")
conn.commit()


@pytest.fixture()
def pg(db: None) -> Any:
pg = PGVectorStore.from_params(
Expand Down Expand Up @@ -109,6 +124,39 @@ def pg_hybrid(db: None) -> Any:
asyncio.run(pg.close())


@pytest.fixture()
def pg_hnsw(db_hnsw: None) -> Any:
pg = PGVectorStore.from_params(
**PARAMS, # type: ignore
database=TEST_DB_HNSW,
table_name=TEST_TABLE_NAME,
schema_name=TEST_SCHEMA_NAME,
embed_dim=TEST_EMBED_DIM,
hnsw_kwargs={"hnsw_m": 16, "hnsw_ef_construction": 64, "hnsw_ef_search": 40},
)

yield pg

asyncio.run(pg.close())


@pytest.fixture()
def pg_hnsw_hybrid(db_hnsw: None) -> Any:
pg = PGVectorStore.from_params(
**PARAMS, # type: ignore
database=TEST_DB_HNSW,
table_name=TEST_TABLE_NAME,
schema_name=TEST_SCHEMA_NAME,
embed_dim=TEST_EMBED_DIM,
hybrid_search=True,
hnsw_kwargs={"hnsw_m": 16, "hnsw_ef_construction": 64, "hnsw_ef_search": 40},
)

yield pg

asyncio.run(pg.close())


@pytest.fixture(scope="session")
def node_embeddings() -> List[TextNode]:
return [
Expand Down Expand Up @@ -235,6 +283,30 @@ async def test_add_to_db_and_query(
assert res.nodes[0].node_id == "aaa"


@pytest.mark.skipif(postgres_not_available, reason="postgres db is not available")
@pytest.mark.asyncio()
@pytest.mark.parametrize("use_async", [True, False])
async def test_query_hnsw(
pg_hnsw: PGVectorStore, node_embeddings: List[TextNode], use_async: bool
):
if use_async:
await pg_hnsw.async_add(node_embeddings)
else:
pg_hnsw.add(node_embeddings)

assert isinstance(pg_hnsw, PGVectorStore)
assert hasattr(pg_hnsw, "_engine")

q = VectorStoreQuery(query_embedding=_get_sample_vector(1.0), similarity_top_k=1)
if use_async:
res = await pg_hnsw.aquery(q)
else:
res = pg_hnsw.query(q)
assert res.nodes
assert len(res.nodes) == 1
assert res.nodes[0].node_id == "aaa"


@pytest.mark.skipif(postgres_not_available, reason="postgres db is not available")
@pytest.mark.asyncio()
@pytest.mark.parametrize("use_async", [True, False])
Expand Down Expand Up @@ -490,6 +562,78 @@ async def test_hybrid_query(
assert res.nodes[3].node_id == "ddd"


@pytest.mark.skipif(postgres_not_available, reason="postgres db is not available")
@pytest.mark.asyncio()
@pytest.mark.parametrize("use_async", [True, False])
async def test_hybrid_query(
pg_hnsw_hybrid: PGVectorStore,
hybrid_node_embeddings: List[TextNode],
use_async: bool,
) -> None:
if use_async:
await pg_hnsw_hybrid.async_add(hybrid_node_embeddings)
else:
pg_hnsw_hybrid.add(hybrid_node_embeddings)
assert isinstance(pg_hnsw_hybrid, PGVectorStore)
assert hasattr(pg_hnsw_hybrid, "_engine")

q = VectorStoreQuery(
query_embedding=_get_sample_vector(0.1),
query_str="fox",
similarity_top_k=2,
mode=VectorStoreQueryMode.HYBRID,
sparse_top_k=1,
)

if use_async:
res = await pg_hnsw_hybrid.aquery(q)
else:
res = pg_hnsw_hybrid.query(q)
assert res.nodes
assert len(res.nodes) == 3
assert res.nodes[0].node_id == "aaa"
assert res.nodes[1].node_id == "bbb"
assert res.nodes[2].node_id == "ccc"

# if sparse_top_k is not specified, it should default to similarity_top_k
q = VectorStoreQuery(
query_embedding=_get_sample_vector(0.1),
query_str="fox",
similarity_top_k=2,
mode=VectorStoreQueryMode.HYBRID,
)

if use_async:
res = await pg_hnsw_hybrid.aquery(q)
else:
res = pg_hnsw_hybrid.query(q)
assert res.nodes
assert len(res.nodes) == 4
assert res.nodes[0].node_id == "aaa"
assert res.nodes[1].node_id == "bbb"
assert res.nodes[2].node_id == "ccc"
assert res.nodes[3].node_id == "ddd"

# text search should work when query is a sentence and not just a single word
q = VectorStoreQuery(
query_embedding=_get_sample_vector(0.1),
query_str="who is the fox?",
similarity_top_k=2,
mode=VectorStoreQueryMode.HYBRID,
)

if use_async:
res = await pg_hnsw_hybrid.aquery(q)
else:
res = pg_hnsw_hybrid.query(q)
assert res.nodes
assert len(res.nodes) == 4
assert res.nodes[0].node_id == "aaa"
assert res.nodes[1].node_id == "bbb"
assert res.nodes[2].node_id == "ccc"
assert res.nodes[3].node_id == "ddd"


@pytest.mark.skipif(postgres_not_available, reason="postgres db is not available")
@pytest.mark.asyncio()
@pytest.mark.parametrize("use_async", [True, False])
Expand Down
Loading