Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 86 additions & 20 deletions bench/bench-vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,40 +63,57 @@ def name(self) -> str:


class EmberClient(VectorClient):
"""ember vector client using redis-py for RESP command execution."""
"""ember vector client using redis-py for RESP command execution.

def __init__(self, host: str = "127.0.0.1", port: int = 6379):
uses binary mode (no decode_responses) to avoid unnecessary UTF-8
decoding overhead on responses we don't inspect during inserts.
"""

def __init__(self, host: str = "127.0.0.1", port: int = 6379,
shards: int = 1, pipeline_depth: int = 1):
import redis
self.conn = redis.Redis(host=host, port=port, decode_responses=True)
self.key = "bench_vectors"
self.conn = redis.Redis(host=host, port=port)
self.shards = shards
self.pipeline_depth = pipeline_depth
self.keys = [f"bench_vectors_{i}" for i in range(shards)] if shards > 1 else ["bench_vectors"]

def _key_for_batch(self, batch_idx: int) -> str:
return self.keys[batch_idx % len(self.keys)]

def setup(self, dim: int, metric: str = "cosine"):
# clear any previous data
self.conn.delete(self.key)
for key in self.keys:
self.conn.delete(key)

def insert_batch(self, ids: list, vectors: np.ndarray):
def insert_batch(self, ids: list, vectors: np.ndarray, key: str = None):
target = key or self.keys[0]
dim = vectors.shape[1]
args = [self.key, "DIM", str(dim)]
args = [target, "DIM", str(dim)]
for i, vid in enumerate(ids):
args.append(vid)
args.extend(str(float(v)) for v in vectors[i])
args += ["METRIC", "COSINE", "M", "16", "EF", "64"]
self.conn.execute_command("VADD_BATCH", *args)

def query(self, vector: np.ndarray, k: int) -> list:
args = [self.key] + [str(float(v)) for v in vector]
args = [self.keys[0]] + [str(float(v)) for v in vector]
args += ["COUNT", str(k)]
result = self.conn.execute_command("VSIM", *args)
if result is None:
return []
return [r.decode() if isinstance(r, bytes) else str(r) for r in result]

def teardown(self):
self.conn.delete(self.key)
for key in self.keys:
self.conn.delete(key)
self.conn.close()

def name(self) -> str:
return "ember"
suffix = ""
if self.shards > 1:
suffix += f"-{self.shards}shards"
if self.pipeline_depth > 1:
suffix += f"-p{self.pipeline_depth}"
return "ember" + suffix


class EmberGrpcClient(VectorClient):
Expand Down Expand Up @@ -316,15 +333,56 @@ def name(self) -> str:
# ---------------------------------------------------------------------------

def benchmark_insert(client: VectorClient, vectors: np.ndarray,
batch_size: int = 500) -> dict:
"""measure insert throughput. returns vectors/sec."""
batch_size: int = 2000, pipeline_depth: int = 1) -> dict:
"""measure insert throughput.

pipeline_depth > 1 uses redis-py's pipeline to send multiple batches
concurrently (ember-only). other systems ignore this parameter.
"""
n = len(vectors)
ids = [f"vec_{i}" for i in range(n)]

use_pipeline = (
pipeline_depth > 1
and isinstance(client, EmberClient)
and hasattr(client.conn, "pipeline")
)
use_sharding = isinstance(client, EmberClient) and client.shards > 1

start = time.perf_counter()
for i in range(0, n, batch_size):
end_idx = min(i + batch_size, n)
client.insert_batch(ids[i:end_idx], vectors[i:end_idx])

if use_pipeline:
# send pipeline_depth batches before waiting for responses
batches = []
for i in range(0, n, batch_size):
end_idx = min(i + batch_size, n)
key = client._key_for_batch(len(batches)) if use_sharding else None
batches.append((ids[i:end_idx], vectors[i:end_idx], key))

for chunk_start in range(0, len(batches), pipeline_depth):
chunk = batches[chunk_start:chunk_start + pipeline_depth]
pipe = client.conn.pipeline(transaction=False)
for batch_ids, batch_vecs, key in chunk:
target = key or client.keys[0]
dim = batch_vecs.shape[1]
args = [target, "DIM", str(dim)]
for j, vid in enumerate(batch_ids):
args.append(vid)
args.extend(str(float(v)) for v in batch_vecs[j])
args += ["METRIC", "COSINE", "M", "16", "EF", "64"]
pipe.execute_command("VADD_BATCH", *args)
pipe.execute()
else:
batch_idx = 0
for i in range(0, n, batch_size):
end_idx = min(i + batch_size, n)
key = client._key_for_batch(batch_idx) if use_sharding else None
if key is not None:
client.insert_batch(ids[i:end_idx], vectors[i:end_idx], key=key)
else:
client.insert_batch(ids[i:end_idx], vectors[i:end_idx])
batch_idx += 1

elapsed = time.perf_counter() - start

# for pgvector, create the HNSW index after all inserts
Expand All @@ -338,6 +396,8 @@ def benchmark_insert(client: VectorClient, vectors: np.ndarray,

return {
"vectors": n,
"batch_size": batch_size,
"pipeline_depth": pipeline_depth,
"elapsed_sec": round(elapsed, 3),
"index_time_sec": round(index_time, 3),
"throughput": round(throughput, 1),
Expand Down Expand Up @@ -412,7 +472,11 @@ def main():
parser.add_argument("--count", type=int, default=100000)
parser.add_argument("--queries", type=int, default=1000)
parser.add_argument("--k", type=int, default=10)
parser.add_argument("--batch-size", type=int, default=500)
parser.add_argument("--batch-size", type=int, default=2000)
parser.add_argument("--pipeline-depth", type=int, default=1,
help="redis pipeline depth for ember (send N batches concurrently)")
parser.add_argument("--shards", type=int, default=1,
help="distribute vectors across N keys (ember only, leverages thread-per-core)")
parser.add_argument("--ember-port", type=int, default=6379)
parser.add_argument("--ember-grpc-port", type=int, default=6380)
parser.add_argument("--chroma-port", type=int, default=8000)
Expand All @@ -424,7 +488,8 @@ def main():

# create client
if args.system == "ember":
client = EmberClient(port=args.ember_port)
client = EmberClient(port=args.ember_port, shards=args.shards,
pipeline_depth=args.pipeline_depth)
elif args.system == "ember-grpc":
client = EmberGrpcClient(port=args.ember_grpc_port)
elif args.system == "chromadb":
Expand Down Expand Up @@ -453,8 +518,9 @@ def main():
client.setup(dim)

# run benchmarks
print(f"benchmarking {client.name()} insert...", file=sys.stderr)
insert_result = benchmark_insert(client, base_vectors, batch_size=args.batch_size)
print(f"benchmarking {client.name()} insert (batch={args.batch_size}, pipeline={args.pipeline_depth})...", file=sys.stderr)
insert_result = benchmark_insert(client, base_vectors, batch_size=args.batch_size,
pipeline_depth=args.pipeline_depth)
print(f" {insert_result['throughput']:.0f} vectors/sec", file=sys.stderr)

print(f"benchmarking {client.name()} query (k={args.k})...", file=sys.stderr)
Expand Down
7 changes: 6 additions & 1 deletion bench/bench-vector.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
# VECTOR_COUNT base vector count (default: 100000)
# VECTOR_DIM vector dimensions (default: 128)
# QUERY_COUNT query vector count (default: 1000)
# BATCH_SIZE vectors per batch (default: 2000)
# PIPELINE_DEPTH redis pipeline depth (default: 1, ember only)
# EMBER_BIN ember-server binary (default: ./target/release/ember-server)

set -euo pipefail
Expand All @@ -38,7 +40,8 @@ VECTOR_COUNT="${VECTOR_COUNT:-100000}"
VECTOR_DIM="${VECTOR_DIM:-128}"
QUERY_COUNT="${QUERY_COUNT:-1000}"
K=10
BATCH_SIZE=500
BATCH_SIZE="${BATCH_SIZE:-2000}"
PIPELINE_DEPTH="${PIPELINE_DEPTH:-1}"
EMBER_BIN="${EMBER_BIN:-./target/release/ember-server}"
RESULTS_DIR="bench/results"
TIMESTAMP=$(date +%Y%m%d-%H%M%S)
Expand Down Expand Up @@ -284,6 +287,7 @@ else
echo "vectors: $VECTOR_COUNT base, $QUERY_COUNT queries"
echo "dimensions: $VECTOR_DIM"
fi
echo "batch: $BATCH_SIZE vectors/batch, pipeline depth=$PIPELINE_DEPTH"
echo "k: $K"
echo "hnsw: M=16, ef_construction=64"
echo "metric: cosine"
Expand All @@ -297,6 +301,7 @@ BENCH_ARGS=(
--queries "$QUERY_COUNT"
--k "$K"
--batch-size "$BATCH_SIZE"
--pipeline-depth "$PIPELINE_DEPTH"
--ember-port "$EMBER_PORT"
--ember-grpc-port "$EMBER_GRPC_PORT"
--chroma-port "$CHROMA_PORT"
Expand Down
41 changes: 22 additions & 19 deletions crates/ember-core/src/keyspace/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ impl Keyspace {

/// Adds multiple vectors to a vector set in a single operation.
///
/// All vectors are validated upfront (NaN/inf check) before any are inserted.
/// Memory is estimated for the entire batch with one `enforce_memory_limit` call.
/// All vectors are validated upfront (NaN/inf check) before any are inserted,
/// then inserted via the pre-validated path to skip redundant per-element checks.
/// Memory is tracked incrementally during the batch loop (no full-set rescan).
/// On usearch error mid-batch, returns the error but already-applied vectors
/// are included in the result for AOF persistence.
#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -130,7 +131,7 @@ impl Keyspace {
)));
}
for &v in vec {
if v.is_nan() || v.is_infinite() {
if !v.is_finite() {
return Err(VectorWriteError::IndexError(format!(
"element '{elem}' contains NaN or infinity"
)));
Expand Down Expand Up @@ -172,10 +173,10 @@ impl Keyspace {
Some(e) => e,
None => return Err(VectorWriteError::IndexError("entry missing".into())),
};
let old_entry_size = entry.entry_size(key);

let mut added_count = 0;
let mut applied = Vec::with_capacity(entries.len());
let mut bytes_added: usize = 0;

match entry.value {
Value::Vector(ref mut vs) => {
Expand All @@ -185,26 +186,29 @@ impl Keyspace {
return Err(VectorWriteError::IndexError(e.to_string()));
}

// per-element fixed cost (vector storage + graph edges + map overhead)
let per_elem = vs.per_element_bytes();

for (element, vector) in entries {
// clone element for the index (which needs ownership),
// then move both element and vector into applied
match vs.add(element.clone(), &vector) {
Ok(added) => {
if added {
let name_len = element.len();
// vectors validated upfront — skip redundant per-element checks
match vs.add_pre_validated(element.clone(), &vector) {
Ok(is_new_elem) => {
if is_new_elem {
added_count += 1;
bytes_added += per_elem + name_len;
}
applied.push((element, vector));
}
Err(e) => {
// partial insert: return applied vectors so they can
// be persisted to AOF despite the error
// partial insert: apply incremental tracking for what
// succeeded, then return error with applied vectors
entry.touch();
self.next_version += 1;
entry.version = self.next_version;
let new_vs = memory::value_size(&entry.value);
entry.cached_value_size = new_vs;
let new_entry_size = key.len() + new_vs + memory::ENTRY_OVERHEAD;
self.memory.adjust(old_entry_size, new_entry_size);
entry.cached_value_size =
entry.cached_value_size.saturating_add(bytes_added);
self.memory.grow_by(bytes_added);
return Err(VectorWriteError::PartialBatch {
message: format!(
"error at element '{}': {e} ({} vectors applied before failure)",
Expand All @@ -223,10 +227,9 @@ impl Keyspace {
entry.touch();
self.next_version += 1;
entry.version = self.next_version;
let new_vs = memory::value_size(&entry.value);
entry.cached_value_size = new_vs;
let new_entry_size = key.len() + new_vs + memory::ENTRY_OVERHEAD;
self.memory.adjust(old_entry_size, new_entry_size);
// incremental tracking — no full-set rescan via memory::value_size()
entry.cached_value_size = entry.cached_value_size.saturating_add(bytes_added);
self.memory.grow_by(bytes_added);

Ok(VAddBatchResult {
added_count,
Expand Down
Loading
Loading