diff --git a/src/index.ts b/src/index.ts index d7d10e5..ff13a47 100644 --- a/src/index.ts +++ b/src/index.ts @@ -11,6 +11,17 @@ const VALID_OPERATIONS = ["SELECT", "UPDATE", "INSERT", "DELETE"] as const; const debug = logger("yates"); +interface Batch { + pgRole: string; + context?: { [x: string]: string | number | string[] }; + requests: Array<{ + query: (args: unknown[]) => PrismaPromise; + args: unknown; + resolve: (result: unknown) => void; + reject: (error: unknown) => void; + }>; +} + const BatchTxIdCounter = { id: 0, nextId() { @@ -316,7 +327,7 @@ export const createClient = ( const { txMaxWait = 30000, txTimeout = 30000 } = options; // @ts-ignore - (prisma as any)._requestHandler.batchBy = (n) => { + (prisma as any)._requestHandler.dataloader.options.batchBy = (n) => { console.log("batch by yates id?", n.transaction?.yates_id); console.log("pq", getBatchId(n.protocolQuery)); return n.transaction?.yates_id @@ -411,6 +422,41 @@ export const createClient = ( return result; }; + let tickActive = false; + const batches: Record = {}; + + const dispatchBatches = () => { + for (const [key, batch] of Object.entries(batches)) { + console.log(key, batch); + prisma + .$transaction([ + prisma.$queryRawUnsafe(`SET ROLE ${batch.pgRole}`), + // Now set all the context variables using `set_config` so that they can be used in RLS + ...toPairs(batch.context).map( + ([key, value]) => + prisma.$queryRaw`SELECT set_config(${key}, ${value.toString()}, true);`, + ), + ...batch.requests.map((request) => + request.query(request.args as unknown[]), + ), + // Switch role back to admin user + prisma.$queryRawUnsafe("SET ROLE none"), + ]) + .then((results) => { + const n = toPairs(batch.context).length + 1; + const slicedResults = results.slice(0, n - 1); + slicedResults.forEach((result, index) => { + batch.requests[index].resolve(result); + }); + delete batches[key]; + }) + .catch((e) => { + batch.requests.forEach((request) => request.reject(e)); + delete batches[key]; + }); + } + }; + const client = prisma.$extends({ name: "Yates client", query: { @@ -468,64 +514,34 @@ export const createClient = ( } try { - const txId = - ctx.transactionId ?? - hashWithPrefix("yates_tx_", JSON.stringify(ctx)); - - // @ts-ignore - if (!globalThis.txIdSet) { - // @ts-ignore - globalThis.txIdSet = new Set(); + const txId = hashWithPrefix("yates_tx_", JSON.stringify(ctx)); + + const hash = txId; + if (!batches[hash]) { + batches[hash] = { + pgRole, + context, + requests: [], + }; + + // make sure, that we only tick once at a time + if (tickActive) { + tickActive = true; + process.nextTick(() => { + dispatchBatches(); + tickActive = false; + }); + } } - let txInitiatedInTick = false; - - // @ts-ignore - if (globalThis.txIdSet.has(txId)) { - txInitiatedInTick = true; - } else { - // @ts-ignore - globalThis.txIdSet.add(txId); - process.nextTick(() => { - // @ts-ignore - globalThis.txIdSet.delete(txId); + return new Promise((resolve, reject) => { + batches[hash].requests.push({ + query, + args, + resolve, + reject, }); - } - if (txInitiatedInTick) { - // @ts-ignore - const results = await prisma.$transaction([query(args)], { - maxWait: txMaxWait, - timeout: txTimeout, - new_tx_id: txId, - }); - return results.pop(); - } - // Because batch transactions inside a prisma client query extension can run out of order if used with async middleware, - // we need to run the logic inside an interactive transaction, however this brings a different set of problems in that the - // main query will no longer automatically run inside the transaction. We resolve this issue by manually executing the prisma request. - // See https://github.com/prisma/prisma/issues/18276 - // @ts-ignore - const results = await prisma.$transaction( - [ - // Switch to the user role, We can't use a prepared statement here, due to limitations in PG not allowing prepared statements to be used in SET ROLE - prisma.$queryRawUnsafe(`SET ROLE ${pgRole}`), - // Now set all the context variables using `set_config` so that they can be used in RLS - ...toPairs(context).map( - ([key, value]) => - prisma.$queryRaw`SELECT set_config(${key}, ${value.toString()}, true);`, - ), - // Inconveniently, the `query` function will not run inside an interactive transaction. - // We need to manually reconstruct the query, and attached the "secret" transaction ID. - // This ensures that the query will run inside the transaction AND that middlewares will not be re-applied - query(args), - ], - { - maxWait: txMaxWait, - timeout: txTimeout, - new_tx_id: txId, - }, - ); - return results.pop(); + }); } catch (e) { // Normalize RLS errors to make them a bit more readable. if (