Skip to content

Commit

Permalink
Bugfix/update nodevm sandbox options, sanitize tablename (#3818)
Browse files Browse the repository at this point in the history
* update nodevm sandbox options, sanitize tablename

* sanitize file name when getFileFromStorage
  • Loading branch information
HenryHengZJ authored Jan 7, 2025
1 parent 2280159 commit 9a417bd
Show file tree
Hide file tree
Showing 16 changed files with 269 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,14 @@ class CustomDocumentLoader_DocumentLoaders implements INode {
}
}

let sandbox: any = { $input: input }
let sandbox: any = {
$input: input,
util: undefined,
Symbol: undefined,
child_process: undefined,
fs: undefined,
process: undefined
}
sandbox['$vars'] = prepareSandboxVars(variables)
sandbox['$flow'] = flow

Expand All @@ -128,7 +135,10 @@ class CustomDocumentLoader_DocumentLoaders implements INode {
require: {
external: { modules: deps },
builtin: builtinDeps
}
},
eval: false,
wasm: false,
timeout: 10000
} as any

const vm = new NodeVM(nodeVMOptions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@ export class MySQLSaver extends BaseCheckpointSaver implements MemoryMethods {
this.threadId = threadId
}

sanitizeTableName(tableName: string): string {
// Trim and normalize case, turn whitespace into underscores
tableName = tableName.trim().toLowerCase().replace(/\s+/g, '_')

// Validate using a regex (alphanumeric and underscores only)
if (!/^[a-zA-Z0-9_]+$/.test(tableName)) {
throw new Error('Invalid table name')
}

return tableName
}

private async getDataSource(): Promise<DataSource> {
const { datasourceOptions } = this.config
if (!datasourceOptions) {
Expand All @@ -38,8 +50,9 @@ export class MySQLSaver extends BaseCheckpointSaver implements MemoryMethods {

try {
const queryRunner = dataSource.createQueryRunner()
const tableName = this.sanitizeTableName(this.tableName)
await queryRunner.manager.query(`
CREATE TABLE IF NOT EXISTS ${this.tableName} (
CREATE TABLE IF NOT EXISTS ${tableName} (
thread_id VARCHAR(255) NOT NULL,
checkpoint_id VARCHAR(255) NOT NULL,
parent_id VARCHAR(255),
Expand All @@ -62,12 +75,13 @@ export class MySQLSaver extends BaseCheckpointSaver implements MemoryMethods {

const thread_id = config.configurable?.thread_id || this.threadId
const checkpoint_id = config.configurable?.checkpoint_id
const tableName = this.sanitizeTableName(this.tableName)

try {
const queryRunner = dataSource.createQueryRunner()
const sql = checkpoint_id
? `SELECT checkpoint, parent_id, metadata FROM ${this.tableName} WHERE thread_id = ? AND checkpoint_id = ?`
: `SELECT thread_id, checkpoint_id, parent_id, checkpoint, metadata FROM ${this.tableName} WHERE thread_id = ? ORDER BY checkpoint_id DESC LIMIT 1`
? `SELECT checkpoint, parent_id, metadata FROM ${tableName} WHERE thread_id = ? AND checkpoint_id = ?`
: `SELECT thread_id, checkpoint_id, parent_id, checkpoint, metadata FROM ${tableName} WHERE thread_id = ? ORDER BY checkpoint_id DESC LIMIT 1`

const rows = await queryRunner.manager.query(sql, checkpoint_id ? [thread_id, checkpoint_id] : [thread_id])
await queryRunner.release()
Expand Down Expand Up @@ -108,7 +122,8 @@ export class MySQLSaver extends BaseCheckpointSaver implements MemoryMethods {
const queryRunner = dataSource.createQueryRunner()
try {
const threadId = config.configurable?.thread_id || this.threadId
let sql = `SELECT thread_id, checkpoint_id, parent_id, checkpoint, metadata FROM ${this.tableName} WHERE thread_id = ? ${
const tableName = this.sanitizeTableName(this.tableName)
let sql = `SELECT thread_id, checkpoint_id, parent_id, checkpoint, metadata FROM ${tableName} WHERE thread_id = ? ${
before ? 'AND checkpoint_id < ?' : ''
} ORDER BY checkpoint_id DESC`
if (limit) {
Expand Down Expand Up @@ -163,8 +178,9 @@ export class MySQLSaver extends BaseCheckpointSaver implements MemoryMethods {
Buffer.from(this.serde.stringify(checkpoint)), // Encode to binary
Buffer.from(this.serde.stringify(metadata)) // Encode to binary
]
const tableName = this.sanitizeTableName(this.tableName)

const query = `INSERT INTO ${this.tableName} (thread_id, checkpoint_id, parent_id, checkpoint, metadata)
const query = `INSERT INTO ${tableName} (thread_id, checkpoint_id, parent_id, checkpoint, metadata)
VALUES (?, ?, ?, ?, ?)
ON DUPLICATE KEY UPDATE checkpoint = VALUES(checkpoint), metadata = VALUES(metadata)`

Expand All @@ -190,10 +206,11 @@ export class MySQLSaver extends BaseCheckpointSaver implements MemoryMethods {

const dataSource = await this.getDataSource()
await this.setup(dataSource)
const tableName = this.sanitizeTableName(this.tableName)

try {
const queryRunner = dataSource.createQueryRunner()
const query = `DELETE FROM ${this.tableName} WHERE thread_id = ?;`
const query = `DELETE FROM ${tableName} WHERE thread_id = ?;`
await queryRunner.manager.query(query, [threadId])
await queryRunner.release()
} catch (error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@ export class PostgresSaver extends BaseCheckpointSaver implements MemoryMethods
this.threadId = threadId
}

sanitizeTableName(tableName: string): string {
// Trim and normalize case, turn whitespace into underscores
tableName = tableName.trim().toLowerCase().replace(/\s+/g, '_')

// Validate using a regex (alphanumeric and underscores only)
if (!/^[a-zA-Z0-9_]+$/.test(tableName)) {
throw new Error('Invalid table name')
}

return tableName
}

private async getDataSource(): Promise<DataSource> {
const { datasourceOptions } = this.config
if (!datasourceOptions) {
Expand All @@ -40,8 +52,9 @@ export class PostgresSaver extends BaseCheckpointSaver implements MemoryMethods

try {
const queryRunner = dataSource.createQueryRunner()
const tableName = this.sanitizeTableName(this.tableName)
await queryRunner.manager.query(`
CREATE TABLE IF NOT EXISTS ${this.tableName} (
CREATE TABLE IF NOT EXISTS ${tableName} (
thread_id TEXT NOT NULL,
checkpoint_id TEXT NOT NULL,
parent_id TEXT,
Expand All @@ -63,12 +76,13 @@ CREATE TABLE IF NOT EXISTS ${this.tableName} (

const thread_id = config.configurable?.thread_id || this.threadId
const checkpoint_id = config.configurable?.checkpoint_id
const tableName = this.sanitizeTableName(this.tableName)

if (checkpoint_id) {
try {
const queryRunner = dataSource.createQueryRunner()
const keys = [thread_id, checkpoint_id]
const sql = `SELECT checkpoint, parent_id, metadata FROM ${this.tableName} WHERE thread_id = $1 AND checkpoint_id = $2`
const sql = `SELECT checkpoint, parent_id, metadata FROM ${tableName} WHERE thread_id = $1 AND checkpoint_id = $2`

const rows = await queryRunner.manager.query(sql, keys)
await queryRunner.release()
Expand All @@ -89,16 +103,16 @@ CREATE TABLE IF NOT EXISTS ${this.tableName} (
}
}
} catch (error) {
console.error(`Error retrieving ${this.tableName}`, error)
throw new Error(`Error retrieving ${this.tableName}`)
console.error(`Error retrieving ${tableName}`, error)
throw new Error(`Error retrieving ${tableName}`)
} finally {
await dataSource.destroy()
}
} else {
try {
const queryRunner = dataSource.createQueryRunner()
const keys = [thread_id]
const sql = `SELECT thread_id, checkpoint_id, parent_id, checkpoint, metadata FROM ${this.tableName} WHERE thread_id = $1 ORDER BY checkpoint_id DESC LIMIT 1`
const sql = `SELECT thread_id, checkpoint_id, parent_id, checkpoint, metadata FROM ${tableName} WHERE thread_id = $1 ORDER BY checkpoint_id DESC LIMIT 1`

const rows = await queryRunner.manager.query(sql, keys)
await queryRunner.release()
Expand All @@ -124,8 +138,8 @@ CREATE TABLE IF NOT EXISTS ${this.tableName} (
}
}
} catch (error) {
console.error(`Error retrieving ${this.tableName}`, error)
throw new Error(`Error retrieving ${this.tableName}`)
console.error(`Error retrieving ${tableName}`, error)
throw new Error(`Error retrieving ${tableName}`)
} finally {
await dataSource.destroy()
}
Expand All @@ -139,7 +153,8 @@ CREATE TABLE IF NOT EXISTS ${this.tableName} (

const queryRunner = dataSource.createQueryRunner()
const thread_id = config.configurable?.thread_id || this.threadId
let sql = `SELECT thread_id, checkpoint_id, parent_id, checkpoint, metadata FROM ${this.tableName} WHERE thread_id = $1`
const tableName = this.sanitizeTableName(this.tableName)
let sql = `SELECT thread_id, checkpoint_id, parent_id, checkpoint, metadata FROM ${tableName} WHERE thread_id = $1`
const args = [thread_id]

if (before?.configurable?.checkpoint_id) {
Expand Down Expand Up @@ -179,8 +194,8 @@ CREATE TABLE IF NOT EXISTS ${this.tableName} (
}
}
} catch (error) {
console.error(`Error listing ${this.tableName}`, error)
throw new Error(`Error listing ${this.tableName}`)
console.error(`Error listing ${tableName}`, error)
throw new Error(`Error listing ${tableName}`)
} finally {
await dataSource.destroy()
}
Expand All @@ -200,8 +215,9 @@ CREATE TABLE IF NOT EXISTS ${this.tableName} (
Buffer.from(this.serde.stringify(checkpoint)), // Encode to binary
Buffer.from(this.serde.stringify(metadata)) // Encode to binary
]
const tableName = this.sanitizeTableName(this.tableName)

const query = `INSERT INTO ${this.tableName} (thread_id, checkpoint_id, parent_id, checkpoint, metadata)
const query = `INSERT INTO ${tableName} (thread_id, checkpoint_id, parent_id, checkpoint, metadata)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (thread_id, checkpoint_id)
DO UPDATE SET checkpoint = EXCLUDED.checkpoint, metadata = EXCLUDED.metadata`
Expand Down Expand Up @@ -230,8 +246,9 @@ CREATE TABLE IF NOT EXISTS ${this.tableName} (

const dataSource = await this.getDataSource()
await this.setup(dataSource)
const tableName = this.sanitizeTableName(this.tableName)

const query = `DELETE FROM "${this.tableName}" WHERE thread_id = $1;`
const query = `DELETE FROM "${tableName}" WHERE thread_id = $1;`

try {
const queryRunner = dataSource.createQueryRunner()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@ export class SqliteSaver extends BaseCheckpointSaver implements MemoryMethods {
this.threadId = threadId
}

sanitizeTableName(tableName: string): string {
// Trim and normalize case, turn whitespace into underscores
tableName = tableName.trim().toLowerCase().replace(/\s+/g, '_')

// Validate using a regex (alphanumeric and underscores only)
if (!/^[a-zA-Z0-9_]+$/.test(tableName)) {
throw new Error('Invalid table name')
}

return tableName
}

private async getDataSource(): Promise<DataSource> {
const { datasourceOptions } = this.config
const dataSource = new DataSource(datasourceOptions)
Expand All @@ -33,8 +45,9 @@ export class SqliteSaver extends BaseCheckpointSaver implements MemoryMethods {

try {
const queryRunner = dataSource.createQueryRunner()
const tableName = this.sanitizeTableName(this.tableName)
await queryRunner.manager.query(`
CREATE TABLE IF NOT EXISTS ${this.tableName} (
CREATE TABLE IF NOT EXISTS ${tableName} (
thread_id TEXT NOT NULL,
checkpoint_id TEXT NOT NULL,
parent_id TEXT,
Expand All @@ -56,12 +69,13 @@ CREATE TABLE IF NOT EXISTS ${this.tableName} (

const thread_id = config.configurable?.thread_id || this.threadId
const checkpoint_id = config.configurable?.checkpoint_id
const tableName = this.sanitizeTableName(this.tableName)

if (checkpoint_id) {
try {
const queryRunner = dataSource.createQueryRunner()
const keys = [thread_id, checkpoint_id]
const sql = `SELECT checkpoint, parent_id, metadata FROM ${this.tableName} WHERE thread_id = ? AND checkpoint_id = ?`
const sql = `SELECT checkpoint, parent_id, metadata FROM ${tableName} WHERE thread_id = ? AND checkpoint_id = ?`

const rows = await queryRunner.manager.query(sql, [...keys])
await queryRunner.release()
Expand All @@ -82,16 +96,16 @@ CREATE TABLE IF NOT EXISTS ${this.tableName} (
}
}
} catch (error) {
console.error(`Error retrieving ${this.tableName}`, error)
throw new Error(`Error retrieving ${this.tableName}`)
console.error(`Error retrieving ${tableName}`, error)
throw new Error(`Error retrieving ${tableName}`)
} finally {
await dataSource.destroy()
}
} else {
try {
const queryRunner = dataSource.createQueryRunner()
const keys = [thread_id]
const sql = `SELECT thread_id, checkpoint_id, parent_id, checkpoint, metadata FROM ${this.tableName} WHERE thread_id = ? ORDER BY checkpoint_id DESC LIMIT 1`
const sql = `SELECT thread_id, checkpoint_id, parent_id, checkpoint, metadata FROM ${tableName} WHERE thread_id = ? ORDER BY checkpoint_id DESC LIMIT 1`

const rows = await queryRunner.manager.query(sql, [...keys])
await queryRunner.release()
Expand All @@ -117,8 +131,8 @@ CREATE TABLE IF NOT EXISTS ${this.tableName} (
}
}
} catch (error) {
console.error(`Error retrieving ${this.tableName}`, error)
throw new Error(`Error retrieving ${this.tableName}`)
console.error(`Error retrieving ${tableName}`, error)
throw new Error(`Error retrieving ${tableName}`)
} finally {
await dataSource.destroy()
}
Expand All @@ -132,7 +146,8 @@ CREATE TABLE IF NOT EXISTS ${this.tableName} (

const queryRunner = dataSource.createQueryRunner()
const thread_id = config.configurable?.thread_id || this.threadId
let sql = `SELECT thread_id, checkpoint_id, parent_id, checkpoint, metadata FROM ${this.tableName} WHERE thread_id = ? ${
const tableName = this.sanitizeTableName(this.tableName)
let sql = `SELECT thread_id, checkpoint_id, parent_id, checkpoint, metadata FROM ${tableName} WHERE thread_id = ? ${
before ? 'AND checkpoint_id < ?' : ''
} ORDER BY checkpoint_id DESC`
if (limit) {
Expand Down Expand Up @@ -167,8 +182,8 @@ CREATE TABLE IF NOT EXISTS ${this.tableName} (
}
}
} catch (error) {
console.error(`Error listing ${this.tableName}`, error)
throw new Error(`Error listing ${this.tableName}`)
console.error(`Error listing ${tableName}`, error)
throw new Error(`Error listing ${tableName}`)
} finally {
await dataSource.destroy()
}
Expand All @@ -188,8 +203,8 @@ CREATE TABLE IF NOT EXISTS ${this.tableName} (
this.serde.stringify(checkpoint),
this.serde.stringify(metadata)
]

const query = `INSERT OR REPLACE INTO ${this.tableName} (thread_id, checkpoint_id, parent_id, checkpoint, metadata) VALUES (?, ?, ?, ?, ?)`
const tableName = this.sanitizeTableName(this.tableName)
const query = `INSERT OR REPLACE INTO ${tableName} (thread_id, checkpoint_id, parent_id, checkpoint, metadata) VALUES (?, ?, ?, ?, ?)`

await queryRunner.manager.query(query, row)
await queryRunner.release()
Expand All @@ -215,8 +230,8 @@ CREATE TABLE IF NOT EXISTS ${this.tableName} (

const dataSource = await this.getDataSource()
await this.setup(dataSource)

const query = `DELETE FROM "${this.tableName}" WHERE thread_id = ?;`
const tableName = this.sanitizeTableName(this.tableName)
const query = `DELETE FROM "${tableName}" WHERE thread_id = ?;`

try {
const queryRunner = dataSource.createQueryRunner()
Expand Down
Loading

0 comments on commit 9a417bd

Please sign in to comment.