Skip to content

Commit

Permalink
Added DBTransaction.getForUpdate to address write skew, and added tes…
Browse files Browse the repository at this point in the history
…t demonstrating race condition thrashing
  • Loading branch information
CMCDragonkai committed Jun 26, 2022
1 parent af0e92a commit 5bf710c
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 0 deletions.
34 changes: 34 additions & 0 deletions src/DBTransaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,40 @@ class DBTransaction {
return this._db.deserializeDecrypt<T>(data, raw as any);
}

/**
* Use this for to address write skews
*/
public async getForUpdate<T>(
keyPath: KeyPath | string | Buffer,
raw?: false,
): Promise<T | undefined>;
public async getForUpdate(
keyPath: KeyPath | string | Buffer,
raw: true,
): Promise<Buffer | undefined>;
@ready(new errors.ErrorDBTransactionDestroyed())
public async getForUpdate<T>(
keyPath: KeyPath | string | Buffer,
raw: boolean = false,
): Promise<T | Buffer | undefined> {
keyPath = utils.toKeyPath(keyPath);
keyPath = ['data', ...keyPath];
let data: Buffer;
try {
const key = utils.keyPathToKey(keyPath);
data = await rocksdbP.transactionGetForUpdate(this._transaction, key, {
valueEncoding: 'buffer',
snapshot: this.setupSnapshot(),
});
} catch (e) {
if (e.code === 'NOT_FOUND') {
return undefined;
}
throw e;
}
return this._db.deserializeDecrypt<T>(data, raw as any);
}

public async put(
keyPath: KeyPath | string | Buffer,
value: any,
Expand Down
96 changes: 96 additions & 0 deletions tests/DBTransaction.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import path from 'path';
import fs from 'fs';
import Logger, { LogLevel, StreamHandler } from '@matrixai/logger';
import { withF } from '@matrixai/resources';
import { Lock } from '@matrixai/async-locks';
import DB from '@/DB';
import DBTransaction from '@/DBTransaction';
import * as errors from '@/errors';
Expand Down Expand Up @@ -258,6 +259,101 @@ describe(DBTransaction.name, () => {
});
expect(await db.get('hello')).toBeUndefined();
});
test('getForUpdate addresses write-skew by promoting gets into same-value puts', async () => {
// Snapshot isolation allows write skew anomalies to occur
// A write skew means that 2 transactions concurrently read from overlapping keys
// then make disjoint updates to the keys, that breaks a consistency constraint on those keys
// For example:
// T1 reads from k1, k2, writes to k1
// T2 reads from k1, k2, writes to k2
// Where k1 + k2 >= 0
await db.put('balance1', '100');
await db.put('balance2', '100');
const t1 = withF([db.transaction()], async ([tran]) => {
let balance1 = parseInt((await tran.getForUpdate('balance1'))!);
const balance2 = parseInt((await tran.getForUpdate('balance2'))!);
balance1 -= 100;
expect(balance1 + balance2).toBeGreaterThanOrEqual(0);
await tran.put('balance1', balance1.toString());
});
const t2 = withF([db.transaction()], async ([tran]) => {
const balance1 = parseInt((await tran.getForUpdate('balance1'))!);
let balance2 = parseInt((await tran.getForUpdate('balance2'))!);
balance2 -= 100;
expect(balance1 + balance2).toBeGreaterThanOrEqual(0);
await tran.put('balance2', balance2.toString());
});
// By using getForUpdate, we promote the read to a write, where it writes the same value
// this causes a write-write conflict
const results = await Promise.allSettled([t1, t2]);
// One will succeed, one will fail
expect(results.some((result) => result.status === 'fulfilled')).toBe(true);
expect(
results.some((result) => {
return (
result.status === 'rejected' &&
result.reason instanceof errors.ErrorDBTransactionConflict
);
}),
).toBe(true);
});
test('PCC locking to prevent thrashing for racing counters', async () => {
await db.put('counter', '0');
let t1 = withF([db.transaction()], async ([tran]) => {
// Can also use `getForUpdate`, but a conflict exists even for `get`
let counter = parseInt((await tran.get('counter'))!);
counter++;
await tran.put('counter', counter.toString());
});
let t2 = withF([db.transaction()], async ([tran]) => {
// Can also use `getForUpdate`, but a conflict exists even for `get`
let counter = parseInt((await tran.get('counter'))!);
counter++;
await tran.put('counter', counter.toString());
});
let results = await Promise.allSettled([t1, t2]);
expect(results.some((result) => result.status === 'fulfilled')).toBe(true);
expect(
results.some((result) => {
return (
result.status === 'rejected' &&
result.reason instanceof errors.ErrorDBTransactionConflict
);
}),
).toBe(true);
expect(await db.get('counter')).toBe('1');
// In OCC, concurrent requests to update an atomic counter would result
// in race thrashing where only 1 request succeeds, and all other requests
// keep failing. The only way to prevent this thrashing is to use PCC locking
await db.put('counter', '0');
const l = new Lock();
t1 = l.withF(async () => {
await withF([db.transaction()], async ([tran]) => {
// Can also use `get`, no difference here
let counter = parseInt((await tran.getForUpdate('counter'))!);
counter++;
await tran.put('counter', counter.toString());
});
});
t2 = l.withF(async () => {
await withF([db.transaction()], async ([tran]) => {
// Can also use `get`, no difference here
let counter = parseInt((await tran.getForUpdate('counter'))!);
counter++;
await tran.put('counter', counter.toString());
});
});
results = await Promise.allSettled([t1, t2]);
expect(results.every((result) => result.status === 'fulfilled'));
expect(await db.get('counter')).toBe('2');
// The PCC locks must be done outside of transaction creation
// This is because the PCC locks enforce mutual exclusion between commit operations
// If the locks were done inside the transaction, it's possible for the commit operations
// to be delayed after all mutually exclusive callbacks are executed
// resulting in a DBTransactionConflict
// When this library gains native locking, it must deal with this problem
// by only releasing the locks when the transaction is committed or rollbacked
});
test('iterator get after delete consistency', async () => {
await db.put('hello', 'world');
let results: Array<[KeyPath, Buffer]> = [];
Expand Down

0 comments on commit 5bf710c

Please sign in to comment.