Skip to content

Commit

Permalink
snowflake connector handles timeout and abortsignal
Browse files Browse the repository at this point in the history
Signed-off-by: Carina Koo <carina@datairis.io>
  • Loading branch information
carinakoo8 committed Jan 9, 2025
1 parent 9cbdd4d commit 8af9492
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 7 deletions.
14 changes: 12 additions & 2 deletions packages/malloy-db-snowflake/src/snowflake_connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ export interface SnowflakeConnectionOptions {
scratchSpace?: namespace;

queryOptions?: RunSQLOptions;

// Timeout for the statement
timeoutMs?: number;
}

type PathChain =
Expand Down Expand Up @@ -207,6 +210,11 @@ class SnowArray extends SnowField {
}
}

/**
* Default statement timeoutMs value, 10 Mins
*/
const TIMEOUT_MS = 1000 * 60 * 10;

export class SnowflakeConnection
extends BaseConnection
implements
Expand All @@ -221,6 +229,7 @@ export class SnowflakeConnection
// the database & schema where we do temporary operations like creating a temp table
private scratchSpace?: namespace;
private queryOptions: RunSQLOptions;
private timeoutMs: number;

constructor(
public readonly name: string,
Expand All @@ -235,6 +244,7 @@ export class SnowflakeConnection
this.executor = new SnowflakeExecutor(connOptions, options?.poolOptions);
this.scratchSpace = options?.scratchSpace;
this.queryOptions = options?.queryOptions ?? {};
this.timeoutMs = options?.timeoutMs ?? TIMEOUT_MS;
}

get dialectName(): string {
Expand Down Expand Up @@ -273,10 +283,10 @@ export class SnowflakeConnection

public async runSQL(
sql: string,
options?: RunSQLOptions
options: RunSQLOptions = {}
): Promise<MalloyQueryData> {
const rowLimit = options?.rowLimit ?? this.queryOptions?.rowLimit;
let rows = await this.executor.batch(sql);
let rows = await this.executor.batch(sql, options, this.timeoutMs);
if (rowLimit !== undefined && rows.length > rowLimit) {
rows = rows.slice(0, rowLimit);
}
Expand Down
21 changes: 16 additions & 5 deletions packages/malloy-db-snowflake/src/snowflake_executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ export interface ConnectionConfigFile {
// return ret;
// }


export class SnowflakeExecutor {
private static defaultPoolOptions_: PoolOptions = {
min: 1,
Expand Down Expand Up @@ -149,15 +150,25 @@ export class SnowflakeExecutor {
});
}

public async _execute(sqlText: string, conn: Connection): Promise<QueryData> {
return new Promise((resolve, reject) => {
const _statment = conn.execute({
public async _execute(sqlText: string, conn: Connection, options?: RunSQLOptions, timeoutMs?: number): Promise<QueryData> {
let _statement: RowStatement | undefined;
const cancel = () => {
_statement?.cancel();
}
const timeoutId = timeoutMs ? setTimeout(cancel, timeoutMs) : undefined;
options?.abortSignal?.addEventListener('abort', cancel);
return await new Promise((resolve, reject) => {
_statement = conn.execute({
sqlText,
complete: (
err: SnowflakeError | undefined,
_stmt: RowStatement,
rows?: QueryData
) => {
options?.abortSignal?.removeEventListener('abort', cancel);
if (timeoutId) {
clearTimeout(timeoutId);
}
if (err) {
reject(err);
} else if (rows) {
Expand Down Expand Up @@ -186,10 +197,10 @@ export class SnowflakeExecutor {
);
}

public async batch(sqlText: string): Promise<QueryData> {
public async batch(sqlText: string, options?: RunSQLOptions, timeoutMs?: number): Promise<QueryData> {
return await this.pool_.use(async (conn: Connection) => {
await this._setSessionParams(conn);
return await this._execute(sqlText, conn);
return await this._execute(sqlText, conn, options, timeoutMs);
});
}

Expand Down

0 comments on commit 8af9492

Please sign in to comment.