Skip to content

Commit

Permalink
feat(spanner): add support for change streams transaction exclusion o…
Browse files Browse the repository at this point in the history
…ption (#2049)

Add support for excluding transactions from being recorded in the change streams by passing a new boolean option ExcludeTxnFromChangeStreams in the various write APIs:

`runTransaction`
`getTransaction`
`runPartitionedUpdate`
`_mutate`

Note: Samples will be added later in separate prs.
  • Loading branch information
alkatrivedi authored May 24, 2024
1 parent 628f4b0 commit d95cab5
Show file tree
Hide file tree
Showing 6 changed files with 343 additions and 13 deletions.
26 changes: 22 additions & 4 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ export interface SetIamPolicyRequest {
updateMask?: FieldMask | null;
}

export interface RunPartitionedUpdateOptions extends ExecuteSqlRequest {
excludeTxnFromChangeStreams?: boolean;
}

export type UpdateSchemaCallback = ResourceCallback<
GaxOperation,
databaseAdmin.longrunning.IOperation
Expand Down Expand Up @@ -2092,6 +2096,9 @@ class Database extends common.GrpcServiceObject {
if (options.optimisticLock) {
transaction!.useOptimisticLock();
}
if (options.excludeTxnFromChangeStreams) {
transaction!.excludeTxnFromChangeStreams();
}
if (!err) {
this._releaseOnEnd(session!, transaction!);
}
Expand Down Expand Up @@ -2711,13 +2718,15 @@ class Database extends common.GrpcServiceObject {
* @param {RunUpdateCallback} [callback] Callback function.
* @returns {Promise<RunUpdateResponse>}
*/
runPartitionedUpdate(query: string | ExecuteSqlRequest): Promise<[number]>;
runPartitionedUpdate(
query: string | ExecuteSqlRequest,
query: string | RunPartitionedUpdateOptions
): Promise<[number]>;
runPartitionedUpdate(
query: string | RunPartitionedUpdateOptions,
callback?: RunUpdateCallback
): void;
runPartitionedUpdate(
query: string | ExecuteSqlRequest,
query: string | RunPartitionedUpdateOptions,
callback?: RunUpdateCallback
): void | Promise<[number]> {
this.pool_.getSession((err, session) => {
Expand All @@ -2732,11 +2741,14 @@ class Database extends common.GrpcServiceObject {

_runPartitionedUpdate(
session: Session,
query: string | ExecuteSqlRequest,
query: string | RunPartitionedUpdateOptions,
callback?: RunUpdateCallback
): void | Promise<number> {
const transaction = session.partitionedDml();

if (typeof query !== 'string' && query.excludeTxnFromChangeStreams) {
transaction.excludeTxnFromChangeStreams();
}
transaction.begin(err => {
if (err) {
this.pool_.release(session!);
Expand Down Expand Up @@ -3059,6 +3071,9 @@ class Database extends common.GrpcServiceObject {
if (options.optimisticLock) {
transaction!.useOptimisticLock();
}
if (options.excludeTxnFromChangeStreams) {
transaction!.excludeTxnFromChangeStreams();
}

const release = this.pool_.release.bind(this.pool_, session!);
const runner = new TransactionRunner(
Expand Down Expand Up @@ -3173,6 +3188,9 @@ class Database extends common.GrpcServiceObject {
if (options.optimisticLock) {
transaction.useOptimisticLock();
}
if (options.excludeTxnFromChangeStreams) {
transaction.excludeTxnFromChangeStreams();
}
const runner = new AsyncTransactionRunner<T>(
session,
transaction,
Expand Down
29 changes: 21 additions & 8 deletions src/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ export type DropTableCallback = UpdateSchemaCallback;

interface MutateRowsOptions extends CommitOptions {
requestOptions?: Omit<IRequestOptions, 'requestTag'>;
excludeTxnFromChangeStreams?: boolean;
}

export type DeleteRowsCallback = CommitCallback;
Expand Down Expand Up @@ -1073,15 +1074,27 @@ class Table {
): void {
const requestOptions =
'requestOptions' in options ? options.requestOptions : {};
this.database.runTransaction({requestOptions}, (err, transaction) => {
if (err) {
callback(err);
return;
}

transaction![method](this.name, rows as Key[]);
transaction!.commit(options, callback);
});
const excludeTxnFromChangeStreams =
'excludeTxnFromChangeStreams' in options
? options.excludeTxnFromChangeStreams
: false;

this.database.runTransaction(
{
requestOptions: requestOptions,
excludeTxnFromChangeStreams: excludeTxnFromChangeStreams,
},
(err, transaction) => {
if (err) {
callback(err);
return;
}

transaction![method](this.name, rows as Key[]);
transaction!.commit(options, callback);
}
);
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/transaction-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export interface RunTransactionOptions {
timeout?: number;
requestOptions?: Pick<IRequestOptions, 'transactionTag'>;
optimisticLock?: boolean;
excludeTxnFromChangeStreams?: boolean;
}

/**
Expand Down Expand Up @@ -204,6 +205,9 @@ export abstract class Runner<T> {
if (this.options.optimisticLock) {
transaction.useOptimisticLock();
}
if (this.options.excludeTxnFromChangeStreams) {
transaction.excludeTxnFromChangeStreams();
}
if (this.attempts > 0) {
await transaction.begin();
}
Expand Down
23 changes: 23 additions & 0 deletions src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2470,6 +2470,18 @@ export class Transaction extends Dml {
useOptimisticLock(): void {
this._options.readWrite!.readLockMode = ReadLockMode.OPTIMISTIC;
}

/**
* Use option excludeTxnFromChangeStreams to exclude read/write transactions
* from being tracked in change streams.
*
* Enabling this options to true will effectively disable change stream tracking
* for a specified transaction, allowing read/write transaction to operate without being
* included in change streams.
*/
excludeTxnFromChangeStreams(): void {
this._options.excludeTxnFromChangeStreams = true;
}
}

/*! Developer Documentation
Expand Down Expand Up @@ -2503,6 +2515,17 @@ export class PartitionedDml extends Dml {
super(session);
this._options = {partitionedDml: options};
}
/**
* Use option excludeTxnFromChangeStreams to exclude partitionedDml
* queries from being tracked in change streams.
*
* Enabling this options to true will effectively disable change stream tracking
* for a specified partitionedDml query, allowing write queries to operate
* without being included in change streams.
*/
excludeTxnFromChangeStreams(): void {
this._options.excludeTxnFromChangeStreams = true;
}

/**
* Execute a DML statement and get the affected row count. Unlike
Expand Down
21 changes: 20 additions & 1 deletion test/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2544,7 +2544,8 @@ describe('Database', () => {

const [query] = runUpdateStub.lastCall.args;

assert.strictEqual(query, QUERY);
assert.strictEqual(query.sql, QUERY.sql);
assert.deepStrictEqual(query.params, QUERY.params);
assert.ok(fakeCallback.calledOnce);
});

Expand Down Expand Up @@ -2581,6 +2582,24 @@ describe('Database', () => {
assert.ok(fakeCallback.calledOnce);
});

it('should accept excludeTxnFromChangeStreams', () => {
const fakeCallback = sandbox.spy();

database.runPartitionedUpdate(
{
excludeTxnFromChangeStream: true,
},
fakeCallback
);

const [query] = runUpdateStub.lastCall.args;

assert.deepStrictEqual(query, {
excludeTxnFromChangeStream: true,
});
assert.ok(fakeCallback.calledOnce);
});

it('should ignore directedReadOptions set for client', () => {
const fakeCallback = sandbox.spy();

Expand Down
Loading

0 comments on commit d95cab5

Please sign in to comment.