Skip to content

Commit

Permalink
Updated transaction helper to take an existing plain (non-TxnClient) …
Browse files Browse the repository at this point in the history
…DB client as well as a Pool (closes #107 and #77)
  • Loading branch information
jawj committed May 5, 2022
1 parent c3889d5 commit 9ad9b4f
Showing 1 changed file with 16 additions and 12 deletions.
28 changes: 16 additions & 12 deletions src/db/transaction.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
/*
Zapatos: https://jawj.github.io/zapatos/
Copyright (C) 2020 - 2021 George MacKerron
Copyright (C) 2020 - 2022 George MacKerron
Released under the MIT licence: see LICENCE file
*/

import * as pg from 'pg';
import { isDatabaseError } from './pgErrors';
import { wait } from './utils';
import { sql, raw } from './core';
import { getConfig } from "./config";
import { getConfig } from './config';
import type { Queryable } from './core';


export enum IsolationLevel {
Expand Down Expand Up @@ -54,36 +55,39 @@ let txnSeq = 0;
* Provide a database client to the callback, whose queries are then wrapped in
* a database transaction. The transaction is committed, retried, or rolled back
* as appropriate.
* @param txnClientOrPool The `pg.Pool` from which to check out the database
* client or an appropriate transaction client to be passed through
* @param txnClientOrQueryable The `pg.Pool` from which to check out a client,
* a plain client, or an existing transaction client to be passed through
* @param isolationLevel The desired isolation level (e.g.
* `IsolationLevel.Serializable`)
* @param callback A callback function that runs queries on the client provided
* to it
*/
export async function transaction<T, M extends IsolationLevel>(
txnClientOrPool: pg.Pool | TxnClient<IsolationSatisfying<M>>,
txnClientOrQueryable: Queryable | TxnClient<IsolationSatisfying<M>>,
isolationLevel: M,
callback: (client: TxnClient<IsolationSatisfying<M>>) => Promise<T>
): Promise<T> {

if (Object.prototype.hasOwnProperty.call(txnClientOrPool, '_zapatos')) {
// if txnClientOrPool is a TxnClient, just pass it through
return callback(txnClientOrPool as TxnClient<IsolationSatisfying<M>>);
if (Object.prototype.hasOwnProperty.call(txnClientOrQueryable, '_zapatos')) {
// if txnClientOrQueryable is a TxnClient, just pass it through
return callback(txnClientOrQueryable as TxnClient<IsolationSatisfying<M>>);
}

if (txnSeq >= Number.MAX_SAFE_INTEGER - 1) txnSeq = 0; // wrap around

const
txnId = txnSeq++,
txnClient = await txnClientOrPool.connect() as TxnClient<M>,
clientIsOurs = txnClientOrQueryable instanceof pg.Pool, // => we'll check a client out, and release it later
txnClient = (clientIsOurs ? await txnClientOrQueryable.connect() : txnClientOrQueryable) as TxnClient<M>;

txnClient._zapatos = { isolationLevel, txnId };

const
config = getConfig(),
{ transactionListener } = config,
maxAttempts = config.transactionAttemptsMax,
{ minMs, maxMs } = config.transactionRetryDelay;

txnClient._zapatos = { isolationLevel, txnId };

try {
for (let attempt = 1; ; attempt++) {
try {
Expand Down Expand Up @@ -122,7 +126,7 @@ export async function transaction<T, M extends IsolationLevel>(

} finally {
delete txnClient._zapatos;
txnClient.release();
if (clientIsOurs) txnClient.release();
}
}

Expand Down

0 comments on commit 9ad9b4f

Please sign in to comment.