Skip to content

Commit

Permalink
community[minor]: Added SQLiteRecordManager (#4321)
Browse files Browse the repository at this point in the history
* created sqlite record manager and integration tests

* Updated tests and implementation

* Updated tests and implementation

Updated implementation to make it cleaner

* some tiny refactors

* cr

* update peer deps, add to config file

* format

---------

Co-authored-by: bracesproul <braceasproul@gmail.com>
  • Loading branch information
jasonnathan and bracesproul committed Feb 9, 2024
1 parent b52852b commit 3429110
Show file tree
Hide file tree
Showing 6 changed files with 461 additions and 0 deletions.
4 changes: 4 additions & 0 deletions libs/langchain-community/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,10 @@ indexes/memory.cjs
indexes/memory.js
indexes/memory.d.ts
indexes/memory.d.cts
indexes/sqlite.cjs
indexes/sqlite.js
indexes/sqlite.d.ts
indexes/sqlite.d.cts
util/convex.cjs
util/convex.js
util/convex.d.ts
Expand Down
2 changes: 2 additions & 0 deletions libs/langchain-community/langchain.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ export const config = {
"indexes/base": "indexes/base",
"indexes/postgres": "indexes/postgres",
"indexes/memory": "indexes/memory",
"indexes/sqlite": "indexes/sqlite",
// utils
"util/convex": "utils/convex",
"utils/event_source_parse": "utils/event_source_parse",
Expand Down Expand Up @@ -334,6 +335,7 @@ export const config = {
"util/convex",
// indexes
"indexes/postgres",
"indexes/sqlite",
],
packageSuffix: "community",
tsConfigPath: resolve("./tsconfig.json"),
Expand Down
19 changes: 19 additions & 0 deletions libs/langchain-community/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
"@tensorflow/tfjs-converter": "^3.6.0",
"@tensorflow/tfjs-core": "^3.6.0",
"@tsconfig/recommended": "^1.0.2",
"@types/better-sqlite3": "^7.6.9",
"@types/flat": "^5.0.2",
"@types/html-to-text": "^9",
"@types/jsdom": "^21.1.1",
Expand All @@ -111,6 +112,7 @@
"@xata.io/client": "^0.28.0",
"@xenova/transformers": "^2.5.4",
"@zilliz/milvus2-sdk-node": ">=2.2.11",
"better-sqlite3": "^9.4.0",
"cassandra-driver": "^4.7.2",
"chromadb": "^1.5.3",
"closevector-common": "0.1.3",
Expand Down Expand Up @@ -210,6 +212,7 @@
"@xata.io/client": "^0.28.0",
"@xenova/transformers": "^2.5.4",
"@zilliz/milvus2-sdk-node": ">=2.2.7",
"better-sqlite3": "^9.4.0",
"cassandra-driver": "^4.7.2",
"chromadb": "*",
"closevector-common": "0.1.3",
Expand Down Expand Up @@ -381,6 +384,9 @@
"@zilliz/milvus2-sdk-node": {
"optional": true
},
"better-sqlite3": {
"optional": true
},
"cassandra-driver": {
"optional": true
},
Expand Down Expand Up @@ -2000,6 +2006,15 @@
"import": "./indexes/memory.js",
"require": "./indexes/memory.cjs"
},
"./indexes/sqlite": {
"types": {
"import": "./indexes/sqlite.d.ts",
"require": "./indexes/sqlite.d.cts",
"default": "./indexes/sqlite.d.ts"
},
"import": "./indexes/sqlite.js",
"require": "./indexes/sqlite.cjs"
},
"./util/convex": {
"types": {
"import": "./util/convex.d.ts",
Expand Down Expand Up @@ -2699,6 +2714,10 @@
"indexes/memory.js",
"indexes/memory.d.ts",
"indexes/memory.d.cts",
"indexes/sqlite.cjs",
"indexes/sqlite.js",
"indexes/sqlite.d.ts",
"indexes/sqlite.d.cts",
"util/convex.cjs",
"util/convex.js",
"util/convex.d.ts",
Expand Down
234 changes: 234 additions & 0 deletions libs/langchain-community/src/indexes/sqlite.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
// eslint-disable-next-line import/no-extraneous-dependencies
import Database, { Database as DatabaseType, Statement } from "better-sqlite3";
import {
ListKeyOptions,
RecordManagerInterface,
UpdateOptions,
} from "./base.js";

interface TimeRow {
epoch: number;
}

interface KeyRecord {
key: string;
}

/**
* Options for configuring the SQLiteRecordManager class.
*/
export type SQLiteRecordManagerOptions = {
/**
* The file path of the SQLite database.
* One of either `localPath` or `connectionString` is required.
*/
localPath?: string;
/**
* The connection string of the SQLite database.
* One of either `localPath` or `connectionString` is required.
*/
connectionString?: string;
/**
* The name of the table in the SQLite database.
*/
tableName: string;
};

export class SQLiteRecordManager implements RecordManagerInterface {
lc_namespace = ["langchain", "recordmanagers", "sqlite"];

tableName: string;

db: DatabaseType;

namespace: string;

constructor(namespace: string, config: SQLiteRecordManagerOptions) {
const { localPath, connectionString, tableName } = config;
if (!connectionString && !localPath) {
throw new Error(
"One of either `localPath` or `connectionString` is required."
);
}
if (connectionString && localPath) {
throw new Error(
"Only one of either `localPath` or `connectionString` is allowed."
);
}
this.namespace = namespace;
this.tableName = tableName;
this.db = new Database(connectionString ?? localPath);
}

async createSchema(): Promise<void> {
try {
this.db.exec(`
CREATE TABLE IF NOT EXISTS "${this.tableName}" (
uuid TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))),
key TEXT NOT NULL,
namespace TEXT NOT NULL,
updated_at REAL NOT NULL,
group_id TEXT,
UNIQUE (key, namespace)
);
CREATE INDEX IF NOT EXISTS updated_at_index ON "${this.tableName}" (updated_at);
CREATE INDEX IF NOT EXISTS key_index ON "${this.tableName}" (key);
CREATE INDEX IF NOT EXISTS namespace_index ON "${this.tableName}" (namespace);
CREATE INDEX IF NOT EXISTS group_id_index ON "${this.tableName}" (group_id);`);
} catch (error) {
console.error("Error creating schema");
throw error; // Re-throw the error to let the caller handle it
}
}

async getTime(): Promise<number> {
try {
const statement: Statement<[]> = this.db.prepare(
"SELECT strftime('%s', 'now') AS epoch"
);
const { epoch } = statement.get() as TimeRow;
return Number(epoch);
} catch (error) {
console.error("Error getting time in SQLiteRecordManager:");
throw error;
}
}

async update(keys: string[], updateOptions?: UpdateOptions): Promise<void> {
if (keys.length === 0) {
return;
}

const updatedAt = await this.getTime();
const { timeAtLeast, groupIds: _groupIds } = updateOptions ?? {};

if (timeAtLeast && updatedAt < timeAtLeast) {
throw new Error(
`Time sync issue with database ${updatedAt} < ${timeAtLeast}`
);
}

const groupIds = _groupIds ?? keys.map(() => null);

if (groupIds.length !== keys.length) {
throw new Error(
`Number of keys (${keys.length}) does not match number of group_ids (${groupIds.length})`
);
}

const recordsToUpsert = keys.map((key, i) => [
key,
this.namespace,
updatedAt,
groupIds[i] ?? null, // Ensure groupIds[i] is null if undefined
]);

// Consider using a transaction for batch operations
const updateTransaction = this.db.transaction(() => {
for (const row of recordsToUpsert) {
this.db
.prepare(
`
INSERT INTO "${this.tableName}" (key, namespace, updated_at, group_id)
VALUES (?, ?, ?, ?)
ON CONFLICT (key, namespace) DO UPDATE SET updated_at = excluded.updated_at`
)
.run(...row);
}
});
updateTransaction();
}

async exists(keys: string[]): Promise<boolean[]> {
if (keys.length === 0) {
return [];
}

// Prepare the placeholders and the query
const placeholders = keys.map(() => `?`).join(", ");
const sql = `
SELECT key
FROM "${this.tableName}"
WHERE namespace = ? AND key IN (${placeholders})`;

// Initialize an array to fill with the existence checks
const existsArray = new Array(keys.length).fill(false);

try {
// Execute the query
const rows = this.db
.prepare(sql)
.all(this.namespace, ...keys) as KeyRecord[];
// Create a set of existing keys for faster lookup
const existingKeysSet = new Set(rows.map((row) => row.key));
// Map the input keys to booleans indicating if they exist
keys.forEach((key, index) => {
existsArray[index] = existingKeysSet.has(key);
});
return existsArray;
} catch (error) {
console.error("Error checking existence of keys");
throw error; // Allow the caller to handle the error
}
}

async listKeys(options?: ListKeyOptions): Promise<string[]> {
const { before, after, limit, groupIds } = options ?? {};
let query = `SELECT key FROM "${this.tableName}" WHERE namespace = ?`;
const values: (string | number | string[])[] = [this.namespace];

if (before) {
query += ` AND updated_at < ?`;
values.push(before);
}

if (after) {
query += ` AND updated_at > ?`;
values.push(after);
}

if (limit) {
query += ` LIMIT ?`;
values.push(limit);
}

if (groupIds && Array.isArray(groupIds)) {
query += ` AND group_id IN (${groupIds
.filter((gid) => gid !== null)
.map(() => "?")
.join(", ")})`;
values.push(...groupIds.filter((gid): gid is string => gid !== null));
}

query += ";";

// Directly using try/catch with async/await for cleaner flow
try {
const result = this.db.prepare(query).all(...values) as { key: string }[];
return result.map((row) => row.key);
} catch (error) {
console.error("Error listing keys.");
throw error; // Re-throw the error to be handled by the caller
}
}

async deleteKeys(keys: string[]): Promise<void> {
if (keys.length === 0) {
return;
}

const placeholders = keys.map(() => "?").join(", ");
const query = `DELETE FROM "${this.tableName}" WHERE namespace = ? AND key IN (${placeholders});`;
const values = [this.namespace, ...keys].map((v) =>
typeof v !== "string" ? `${v}` : v
);

// Directly using try/catch with async/await for cleaner flow
try {
this.db.prepare(query).run(...values);
} catch (error) {
console.error("Error deleting keys");
throw error; // Re-throw the error to be handled by the caller
}
}
}
Loading

0 comments on commit 3429110

Please sign in to comment.