Skip to content

Commit

Permalink
f
Browse files Browse the repository at this point in the history
  • Loading branch information
LucianBuzzo committed Nov 1, 2024
1 parent 6c56536 commit c3dadbc
Showing 1 changed file with 72 additions and 56 deletions.
128 changes: 72 additions & 56 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,17 @@ const VALID_OPERATIONS = ["SELECT", "UPDATE", "INSERT", "DELETE"] as const;

const debug = logger("yates");

interface Batch {
pgRole: string;
context?: { [x: string]: string | number | string[] };
requests: Array<{
query: (args: unknown[]) => PrismaPromise<unknown>;
args: unknown;
resolve: (result: unknown) => void;
reject: (error: unknown) => void;
}>;
}

const BatchTxIdCounter = {
id: 0,
nextId() {
Expand Down Expand Up @@ -316,7 +327,7 @@ export const createClient = (
const { txMaxWait = 30000, txTimeout = 30000 } = options;

// @ts-ignore
(prisma as any)._requestHandler.batchBy = (n) => {
(prisma as any)._requestHandler.dataloader.options.batchBy = (n) => {
console.log("batch by yates id?", n.transaction?.yates_id);
console.log("pq", getBatchId(n.protocolQuery));
return n.transaction?.yates_id
Expand Down Expand Up @@ -411,6 +422,41 @@ export const createClient = (
return result;
};

let tickActive = false;
const batches: Record<string, Batch> = {};

const dispatchBatches = () => {
for (const [key, batch] of Object.entries(batches)) {
console.log(key, batch);
prisma
.$transaction([
prisma.$queryRawUnsafe(`SET ROLE ${batch.pgRole}`),
// Now set all the context variables using `set_config` so that they can be used in RLS
...toPairs(batch.context).map(
([key, value]) =>
prisma.$queryRaw`SELECT set_config(${key}, ${value.toString()}, true);`,
),
...batch.requests.map((request) =>
request.query(request.args as unknown[]),
),
// Switch role back to admin user
prisma.$queryRawUnsafe("SET ROLE none"),
])
.then((results) => {
const n = toPairs(batch.context).length + 1;
const slicedResults = results.slice(0, n - 1);
slicedResults.forEach((result, index) => {
batch.requests[index].resolve(result);
});
delete batches[key];
})
.catch((e) => {
batch.requests.forEach((request) => request.reject(e));
delete batches[key];
});
}
};

const client = prisma.$extends({
name: "Yates client",
query: {
Expand Down Expand Up @@ -468,64 +514,34 @@ export const createClient = (
}

try {
const txId =
ctx.transactionId ??
hashWithPrefix("yates_tx_", JSON.stringify(ctx));

// @ts-ignore
if (!globalThis.txIdSet) {
// @ts-ignore
globalThis.txIdSet = new Set();
const txId = hashWithPrefix("yates_tx_", JSON.stringify(ctx));

const hash = txId;
if (!batches[hash]) {
batches[hash] = {
pgRole,
context,
requests: [],
};

// make sure, that we only tick once at a time
if (tickActive) {
tickActive = true;
process.nextTick(() => {
dispatchBatches();
tickActive = false;
});
}
}

let txInitiatedInTick = false;

// @ts-ignore
if (globalThis.txIdSet.has(txId)) {
txInitiatedInTick = true;
} else {
// @ts-ignore
globalThis.txIdSet.add(txId);
process.nextTick(() => {
// @ts-ignore
globalThis.txIdSet.delete(txId);
return new Promise((resolve, reject) => {
batches[hash].requests.push({
query,
args,
resolve,
reject,
});
}
if (txInitiatedInTick) {
// @ts-ignore
const results = await prisma.$transaction([query(args)], {
maxWait: txMaxWait,
timeout: txTimeout,
new_tx_id: txId,
});
return results.pop();
}
// Because batch transactions inside a prisma client query extension can run out of order if used with async middleware,
// we need to run the logic inside an interactive transaction, however this brings a different set of problems in that the
// main query will no longer automatically run inside the transaction. We resolve this issue by manually executing the prisma request.
// See https://github.com/prisma/prisma/issues/18276
// @ts-ignore
const results = await prisma.$transaction(
[
// Switch to the user role, We can't use a prepared statement here, due to limitations in PG not allowing prepared statements to be used in SET ROLE
prisma.$queryRawUnsafe(`SET ROLE ${pgRole}`),
// Now set all the context variables using `set_config` so that they can be used in RLS
...toPairs(context).map(
([key, value]) =>
prisma.$queryRaw`SELECT set_config(${key}, ${value.toString()}, true);`,
),
// Inconveniently, the `query` function will not run inside an interactive transaction.
// We need to manually reconstruct the query, and attached the "secret" transaction ID.
// This ensures that the query will run inside the transaction AND that middlewares will not be re-applied
query(args),
],
{
maxWait: txMaxWait,
timeout: txTimeout,
new_tx_id: txId,
},
);
return results.pop();
});
} catch (e) {
// Normalize RLS errors to make them a bit more readable.
if (
Expand Down

0 comments on commit c3dadbc

Please sign in to comment.