Skip to content

A hackable vector db built on top of uSearch+rocksdb, for you to break it down, dissect and scale on demand

License

Notifications You must be signed in to change notification settings

antarys-ai/edge

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

15 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Antarys Edge

Antarys vector db (lite) built on top of Usearch and RocksDB and http.zig for small to medium sized projects. This version of Antarys also supports our standard python/nodejs clients with compatible APIs (only vector operations, embedding support planned)

You can also use this as the starting point to build a vector db for yourself if you are using zig 0.15.1

This uses Usearch for HNSW indexing and searching, this is not the same as our proprietary search engine. To help support this project, we built binding libraries for zig.

  • uSearch
  • rocksdb (forked from rocksdb-zig for 0.15.1)

To get started clone the project

git clone https://github.com/antarys-ai/edge.git
cd edge
zig build
zig build run

Only tested on Apple ARM, zig build for production has some issues with UBSan due to rocksdb, you can build debug builds on mac but it might throw a linker dump error. Please submit issues for other platforms as well.

Usearch Bindings

pub fn search(
    index: *usearch.Index,
    query: []const f32,
    options: SearchOptions,
    id_map: *const IdMap,
    allocator: std.mem.Allocator,
) SearchError![]SearchResult {
    if (query.len == 0) return SearchError.EmptyQuery;

    if (options.expansion) |exp| {
        index.setExpansionSearch(exp) catch {};
    }

    if (options.threads) |threads| {
        index.setThreadsSearch(threads) catch {};
    }

    const raw_results = index.search(query, options.limit) catch |err| {
        return switch (err) {
            error.IndexUninitialized => SearchError.IndexUninitialized,
            error.EmptyVector => SearchError.EmptyQuery,
            error.DimensionMismatch => SearchError.InvalidDimensions,
            else => SearchError.SearchFailed,
        };
    };
    defer allocator.free(raw_results);

    var results = try std.ArrayList(SearchResult).initCapacity(allocator, 100);
    errdefer {
        for (results.items) |*r| r.deinit(allocator);
        results.deinit(allocator);
    }

    for (raw_results) |raw| {
        if (options.filter) |filter_fn| {
            if (!filter_fn(raw.key)) continue;
        }

        const id = id_map.get(raw.key) orelse continue;
        const id_copy = try allocator.dupe(u8, id);

        var vec: ?[]f32 = null;
        if (options.include_vectors) {
            if (index.get(raw.key, 1)) |v| {
                vec = v;
            } else |_| {}
        }

        try results.append(allocator, .{
            .id = id_copy,
            .distance = raw.distance,
            .vector = vec,
        });
    }

    return results.toOwnedSlice(allocator);
}

Vector DB functions

const db_path = ".test-antarysdb-basic";

std.debug.print("→ Initializing database...\n", .{});
var db = try AntarysDB.init(allocator, .{
    .storage_path = db_path,
    .enable_cache = true,
});
defer db.deinit();

std.debug.print("→ Creating collection...\n", .{});
try db.createCollection("vectors", .{
    .dimensions = 128,
    .metric = .cosine,
});

if (!db.hasCollection("vectors")) {
    std.debug.print("ERROR: Collection not found!\n", .{});
    return error.TestFailed;
}
std.debug.print("Collection created\n", .{});

Internal ThreadPool API

const std = @import("std");
const ThreadPool = @import("threadpool.zig").ThreadPool;
const BatchCoordinator = @import("threadpool.zig").BatchCoordinator;

const ComputeTask = struct {
    id: usize,
    data: []f64,
    result: *std.atomic.Value(f64),

    fn execute(ctx: *anyopaque) void {
        const self: *ComputeTask = @ptrCast(@alignCast(ctx));

        var sum: f64 = 0.0;
        for (self.data) |val| {
            sum += @sqrt(val * val + 1.0);
        }

        self.result.store(sum, .release);
        std.debug.print("Task {} completed: {d:.2}\n", .{ self.id, sum });
    }
};

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    defer _ = gpa.deinit();
    const allocator = gpa.allocator();

    try basicExample(allocator);
    try batchExample(allocator);
    try workStealingExample(allocator);
}

fn basicExample(allocator: std.mem.Allocator) !void {
    var pool = try ThreadPool.init(allocator, .{
        .num_workers = 4,
        .enable_affinity = true,
    });
    defer pool.deinit();

    std.debug.print("Pool initialized with {} workers\n", .{pool.runningWorkers()});

    const num_tasks = 8;
    var results: [num_tasks]std.atomic.Value(f64) = undefined;
    var tasks: [num_tasks]ComputeTask = undefined;

    var all_data: [num_tasks][]f64 = undefined;
    defer {
        for (all_data) |data| {
            allocator.free(data);
        }
    }

    for (&tasks, 0..) |*task, i| {
        results[i] = std.atomic.Value(f64).init(0.0);

        const data = try allocator.alloc(f64, 1000);
        all_data[i] = data;
        for (data, 0..) |*d, j| {
            d.* = @as(f64, @floatFromInt(i * 1000 + j));
        }

        task.* = .{
            .id = i,
            .data = data,
            .result = &results[i],
        };

        try pool.submit(ComputeTask.execute, task);
    }

    pool.waitIdle();

    std.debug.print("All tasks completed!\n", .{});
    std.debug.print("Pending tasks: {}\n", .{pool.pendingTasks()});
    std.debug.print("Active workers: {}\n", .{pool.activeWorkers()});
}

fn batchExample(allocator: std.mem.Allocator) !void {
    var pool = try ThreadPool.init(allocator, .{});
    defer pool.deinit();

    const BatchTask = struct {
        id: usize,
        should_fail: bool,

        fn execute(ctx: *anyopaque) anyerror!void {
            const self: *@This() = @ptrCast(@alignCast(ctx));

            if (self.should_fail) {
                return error.TaskFailed;
            }

            std.Thread.sleep(100 * std.time.ns_per_ms);
            std.debug.print("Batch task {} succeeded\n", .{self.id});
        }
    };

    const num_tasks = 10;
    var coordinator = try BatchCoordinator.init(allocator, &pool, num_tasks);
    defer coordinator.deinit();

    var tasks: [num_tasks]BatchTask = undefined;
    for (&tasks, 0..) |*task, i| {
        task.* = .{
            .id = i,
            .should_fail = (i == 5),
        };
        try coordinator.submitWork(BatchTask.execute, task);
    }

    coordinator.wait() catch |err| {
        std.debug.print("Batch failed with error: {}\n", .{err});
        std.debug.print("Completed: {}/{}\n", .{ coordinator.completedCount(), num_tasks });
        std.debug.print("Errors: {}\n", .{coordinator.errorCount()});
        return;
    };

    std.debug.print("Batch completed successfully!\n", .{});
}

fn workStealingExample(allocator: std.mem.Allocator) !void {
    var pool = try ThreadPool.init(allocator, .{
        .num_workers = 2,
        .local_queue_capacity = 4,
    });
    defer pool.deinit();

    const QuickTask = struct {
        id: usize,
        worker_hint: usize,

        fn execute(ctx: *anyopaque) void {
            const self: *@This() = @ptrCast(@alignCast(ctx));
            std.debug.print("Task {} executing\n", .{self.id});

            var sum: u64 = 0;
            var i: u64 = 0;
            while (i < 1000) : (i += 1) {
                sum += i;
            }
        }
    };

    const num_tasks = 20;
    var tasks: [num_tasks]QuickTask = undefined;

    for (&tasks, 0..) |*task, i| {
        task.* = .{
            .id = i,
            .worker_hint = i % 2,
        };
        try pool.submit(QuickTask.execute, task);
    }

    pool.waitIdle();
    std.debug.print("Work stealing example completed!\n", .{});
}

const PoolStats = struct {
    start_time: i64,
    tasks_submitted: usize,
    tasks_completed: usize,

    fn create() PoolStats {
        return .{
            .start_time = std.time.milliTimestamp(),
            .tasks_submitted = 0,
            .tasks_completed = 0,
        };
    }

    fn printStats(self: *const PoolStats, pool: *ThreadPool) void {
        const elapsed = std.time.milliTimestamp() - self.start_time;
        const throughput = if (elapsed > 0)
            @as(f64, @floatFromInt(self.tasks_completed * 1000)) / @as(f64, @floatFromInt(elapsed))
        else
            0.0;

        std.debug.print("\nPool Statistics \n", .{});
        std.debug.print("Elapsed: {}ms\n", .{elapsed});
        std.debug.print("Tasks submitted: {}\n", .{self.tasks_submitted});
        std.debug.print("Tasks completed: {}\n", .{self.tasks_completed});
        std.debug.print("Throughput: {d:.2} tasks/sec\n", .{throughput});
        std.debug.print("Pending: {}\n", .{pool.pendingTasks()});
        std.debug.print("Active workers: {}\n", .{pool.activeWorkers()});
    }
};

Performance

This version of Antarys is built for small teams experimenting with the antarys client API, it uses HTTP1.1 protocol and it's not primarily built for performance, while it can perform quite close to Qdrant/Pinecone for search, indexing is still quite slow compared to others. It's using Usearch internally for HNSW indexing and searching.

About

A hackable vector db built on top of uSearch+rocksdb, for you to break it down, dissect and scale on demand

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published