Skip to content

Display indexing progress when building a pgvector index #584

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 196 additions & 9 deletions ann_benchmarks/algorithms/pgvector/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
target database, if it has not been already created.
"""

import os
import subprocess
import sys
import os
import threading
import time

import pgvector.psycopg
import psycopg
Expand All @@ -35,6 +37,19 @@
from ...util import get_bool_env_var


METRIC_PROPERTIES = {
"angular": {
"distance_operator": "<=>",
# A substring of e.g. vector_cosine_ops or halfvec_cosine_ops
"ops_type": "cosine",
},
"euclidean": {
"distance_operator": "<->",
"ops_type": "l2",
}
}


def get_pg_param_env_var_name(pg_param_name: str) -> str:
return f'ANN_BENCHMARKS_PG_{pg_param_name.upper()}'

Expand All @@ -49,6 +64,148 @@ def get_pg_conn_param(
return env_var_value


class IndexingProgressMonitor:
"""
Continuously logs indexing progress, elapsed and estimated remaining
indexing time.
"""

MONITORING_DELAY_SEC = 0.5

def __init__(self, psycopg_connect_kwargs: Dict[str, str]) -> None:
self.psycopg_connect_kwargs = psycopg_connect_kwargs
self.monitoring_condition = threading.Condition()
self.stop_requested = False
self.psycopg_connect_kwargs = psycopg_connect_kwargs
self.prev_phase = None
self.prev_progress_pct = None
self.prev_tuples_done = None
self.prev_report_time_sec = None
self.time_to_load_all_tuples_sec = None
self._ef_search = None

def report_progress(
self,
phase: str,
progress_pct: Any,
tuples_done: Any) -> None:
if progress_pct is None:
progress_pct = 0.0
progress_pct = float(progress_pct)
if tuples_done is None:
tuples_done = 0
tuples_done = int(tuples_done)
if (phase == self.prev_phase and
progress_pct == self.prev_progress_pct):
return
time_now_sec = time.time()

elapsed_time_sec = time_now_sec - self.indexing_start_time_sec
fields = [
f"Phase: {phase}",
f"progress: {progress_pct:.1f}%",
f"elapsed time: {elapsed_time_sec:.3f} sec"
]
if (self.prev_report_time_sec is not None and
self.prev_tuples_done is not None and
elapsed_time_sec):
overall_tuples_per_sec = tuples_done / elapsed_time_sec
fields.append(
f"overall tuples/sec: {overall_tuples_per_sec:.2f}")

time_since_last_report_sec = time_now_sec - self.prev_report_time_sec
if time_since_last_report_sec > 0:
cur_tuples_per_sec = ((tuples_done - self.prev_tuples_done) /
time_since_last_report_sec)
fields.append(
f"current tuples/sec: {cur_tuples_per_sec:.2f}")

remaining_pct = 100 - progress_pct
if progress_pct > 0 and remaining_pct > 0:
estimated_remaining_time_sec = \
elapsed_time_sec / progress_pct * remaining_pct
estimated_total_time_sec = \
elapsed_time_sec + estimated_remaining_time_sec
fields.extend([
"estimated remaining time: " \
f"{estimated_remaining_time_sec:.3f} sec" ,
f"estimated total time: {estimated_total_time_sec:.3f} sec"
])
print(", ".join(fields))
sys.stdout.flush()

self.prev_progress_pct = progress_pct
self.prev_phase = phase
self.prev_tuples_done = tuples_done
self.prev_report_time_sec = time_now_sec

def monitoring_loop_impl(self, monitoring_cur) -> None:
while True:
# Indexing progress query taken from
# https://github.com/pgvector/pgvector/blob/master/README.md
monitoring_cur.execute(
"SELECT phase, " +
"round(100.0 * blocks_done / nullif(blocks_total, 0), 1), " +
"tuples_done " +
"FROM pg_stat_progress_create_index");
result_rows = monitoring_cur.fetchall()

if len(result_rows) == 1:
phase, progress_pct, tuples_done = result_rows[0]
self.report_progress(phase, progress_pct, tuples_done)
if (self.time_to_load_all_tuples_sec is None and
phase == 'building index: loading tuples' and
progress_pct is not None and
float(progress_pct) > 100.0 - 1e-7):
# Even after pgvector reports progress as 100%, it still spends
# some time postprocessing the index and writing it to disk.
# We keep track of the the time it takes to reach 100%
# separately.
self.time_to_load_all_tuples_sec = \
time.time() - self.indexing_start_time_sec
elif len(result_rows) > 0:
# This should not happen.
print(f"Expected exactly one progress result row, got: {result_rows}")
with self.monitoring_condition:
if self.stop_requested:
return
self.monitoring_condition.wait(
timeout=self.MONITORING_DELAY_SEC)
if self.stop_requested:
return

def monitor_progress(self) -> None:
prev_phase = None
prev_progress_pct = None
with psycopg.connect(**self.psycopg_connect_kwargs) as monitoring_conn:
with monitoring_conn.cursor() as monitoring_cur:
self.monitoring_loop_impl(monitoring_cur)

def start_monitoring_thread(self) -> None:
self.indexing_start_time_sec = time.time()
self.monitoring_thread = threading.Thread(target=self.monitor_progress)
self.monitoring_thread.start()

def stop_monitoring_thread(self) -> None:
with self.monitoring_condition:
self.stop_requested = True
self.monitoring_condition.notify_all()
self.monitoring_thread.join()
self.indexing_time_sec = time.time() - self.indexing_start_time_sec

def report_timings(self) -> None:
print(f"pgvector total indexing time: {self.indexing_time_sec:3f} sec")
if self.time_to_load_all_tuples_sec is not None:
print(" Time to load all tuples into the index: {:.3f} sec".format(
self.time_to_load_all_tuples_sec
))
postprocessing_time_sec = \
self.indexing_time_sec - self.time_to_load_all_tuples_sec
print(" Index postprocessing time: {:.3f} sec".format(
postprocessing_time_sec))
else:
print(" Detailed breakdown of indexing time not available.")

class PGVector(BaseANN):
def __init__(self, metric, method_param):
self._metric = metric
Expand All @@ -63,6 +220,21 @@ def __init__(self, metric, method_param):
else:
raise RuntimeError(f"unknown metric {metric}")

def get_metric_properties(self) -> Dict[str, str]:
"""
Get properties of the metric type associated with this index.

Returns:
A dictionary with keys distance_operator and ops_type.
"""
if self._metric not in METRIC_PROPERTIES:
raise ValueError(
"Unknown metric: {}. Valid metrics: {}".format(
self._metric,
', '.join(sorted(METRIC_PROPERTIES.keys()))
))
return METRIC_PROPERTIES[self._metric]

def ensure_pgvector_extension_created(self, conn: psycopg.Connection) -> None:
"""
Ensure that `CREATE EXTENSION vector` has been executed.
Expand Down Expand Up @@ -124,23 +296,38 @@ def fit(self, X):
cur.execute("CREATE TABLE items (id int, embedding vector(%d))" % X.shape[1])
cur.execute("ALTER TABLE items ALTER COLUMN embedding SET STORAGE PLAIN")
print("copying data...")
sys.stdout.flush()
num_rows = 0
insert_start_time_sec = time.time()
with cur.copy("COPY items (id, embedding) FROM STDIN WITH (FORMAT BINARY)") as copy:
copy.set_types(["int4", "vector"])
for i, embedding in enumerate(X):
copy.write_row((i, embedding))
num_rows += 1
insert_elapsed_time_sec = time.time() - insert_start_time_sec
print("inserted {} rows into table in {:.3f} seconds".format(
num_rows, insert_elapsed_time_sec))

print("creating index...")
if self._metric == "angular":
cur.execute(
"CREATE INDEX ON items USING hnsw (embedding vector_cosine_ops) WITH (m = %d, ef_construction = %d)" % (self._m, self._ef_construction)
sys.stdout.flush()
create_index_str = \
"CREATE INDEX ON items USING hnsw (embedding vector_%s_ops) " \
"WITH (m = %d, ef_construction = %d)" % (
self.get_metric_properties()["ops_type"],
self._m,
self._ef_construction
)
elif self._metric == "euclidean":
cur.execute("CREATE INDEX ON items USING hnsw (embedding vector_l2_ops) WITH (m = %d, ef_construction = %d)" % (self._m, self._ef_construction))
else:
raise RuntimeError(f"unknown metric {self._metric}")
progress_monitor = IndexingProgressMonitor(psycopg_connect_kwargs)
progress_monitor.start_monitoring_thread()

try:
cur.execute(create_index_str)
finally:
progress_monitor.stop_monitoring_thread()
print("done!")
progress_monitor.report_timings()
self._cur = cur


def set_query_arguments(self, ef_search):
self._ef_search = ef_search
self._cur.execute("SET hnsw.ef_search = %d" % ef_search)
Expand Down
Loading