Skip to content

Commit

Permalink
feat: inline BeginTransaction with first statement (#1692)
Browse files Browse the repository at this point in the history
* feat: inline BeginTransaction with first statement

* fix: prevent multiple begin transactions happen during a race condition happens; make sure stream requests uses a transaction id from the first response even if the stream fails halfway with an UNAVAILABLE error

* fix: minor

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix: call explicit begin transaction only after the first attempt.

* feat: add inline begin transaction for batch DML requests

* fix: lint

* fix: explicit begin transaction for blind commit if a transaction runs over the runner.

* feat: adding more unit tests

* fix: explicit begin transaction for unknown error

* fix: format

* fix: tests after merge

* Lint fix

* fix: lint

* fix: system test

* fix: system emulator test

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
Co-authored-by: surbhigarg92 <surbhigarg.92@gmail.com>
  • Loading branch information
3 people authored Nov 7, 2022
1 parent 8341b1f commit d1b95d2
Show file tree
Hide file tree
Showing 10 changed files with 475 additions and 131 deletions.
1 change: 0 additions & 1 deletion src/session-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1159,7 +1159,6 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface {
const transaction = session.transaction(
(session.parent as Database).queryOptions_
);
await transaction.begin();
session.txn = transaction;
}

Expand Down
5 changes: 4 additions & 1 deletion src/transaction-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ export abstract class Runner<T> {
this.attempts = 0;
this.session = session;
this.transaction = transaction;
this.transaction.useInRunner();

const defaults = {timeout: 3600000};

Expand Down Expand Up @@ -194,7 +195,9 @@ export abstract class Runner<T> {
const transaction = this.session.transaction(
(this.session.parent as Database).queryOptions_
);
await transaction.begin();
if (this.attempts > 0) {
await transaction.begin();
}
return transaction;
}
/**
Expand Down
130 changes: 112 additions & 18 deletions src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ export type CommitCallback =
export class Snapshot extends EventEmitter {
protected _options!: spannerClient.spanner.v1.ITransactionOptions;
protected _seqno = 1;
protected _idWaiter: Readable;
protected _inlineBeginStarted;
protected _useInRunner = false;
id?: Uint8Array | string;
ended: boolean;
metadata?: spannerClient.spanner.v1.ITransaction;
Expand Down Expand Up @@ -289,6 +292,10 @@ export class Snapshot extends EventEmitter {
this.resourceHeader_ = {
[CLOUD_RESOURCE_HEADER]: (this.session.parent as Database).formattedName_,
};
this._idWaiter = new Readable({
read() {},
});
this._inlineBeginStarted = false;
}

/**
Expand Down Expand Up @@ -378,17 +385,7 @@ export class Snapshot extends EventEmitter {
callback!(err, resp);
return;
}

const {id, readTimestamp} = resp;

this.id = id!;
this.metadata = resp;

if (readTimestamp) {
this.readTimestampProto = readTimestamp;
this.readTimestamp = new PreciseDate(readTimestamp as DateStruct);
}

this._update(resp);
callback!(null, resp);
}
);
Expand Down Expand Up @@ -573,6 +570,8 @@ export class Snapshot extends EventEmitter {

if (this.id) {
transaction.id = this.id as Uint8Array;
} else if (this._options.readWrite) {
transaction.begin = this._options;
} else {
transaction.singleUse = this._options;
}
Expand Down Expand Up @@ -603,6 +602,10 @@ export class Snapshot extends EventEmitter {
);

const makeRequest = (resumeToken?: ResumeToken): Readable => {
if (this.id && transaction.begin) {
delete transaction.begin;
transaction.id = this.id;
}
return this.requestStream({
client: 'SpannerClient',
method: 'streamingRead',
Expand All @@ -612,11 +615,21 @@ export class Snapshot extends EventEmitter {
});
};

return partialResultStream(makeRequest, {
return partialResultStream(this._wrapWithIdWaiter(makeRequest), {
json,
jsonOptions,
maxResumeRetries,
});
})
?.on('response', response => {
if (response.metadata && response.metadata!.transaction && !this.id) {
this._update(response.metadata!.transaction);
}
})
.on('error', () => {
if (!this.id && this._useInRunner) {
this.begin();
}
});
}

/**
Expand Down Expand Up @@ -909,6 +922,9 @@ export class Snapshot extends EventEmitter {
.on('response', response => {
if (response.metadata) {
metadata = response.metadata;
if (metadata.transaction && !this.id) {
this._update(metadata.transaction);
}
}
})
.on('data', row => rows.push(row))
Expand Down Expand Up @@ -1034,6 +1050,8 @@ export class Snapshot extends EventEmitter {
const transaction: spannerClient.spanner.v1.ITransactionSelector = {};
if (this.id) {
transaction.id = this.id as Uint8Array;
} else if (this._options.readWrite) {
transaction.begin = this._options;
} else {
transaction.singleUse = this._options;
}
Expand All @@ -1059,7 +1077,7 @@ export class Snapshot extends EventEmitter {
};

const makeRequest = (resumeToken?: ResumeToken): Readable => {
if (!reqOpts) {
if (!reqOpts || (this.id && !reqOpts.transaction.id)) {
try {
sanitizeRequest();
} catch (e) {
Expand All @@ -1078,11 +1096,21 @@ export class Snapshot extends EventEmitter {
});
};

return partialResultStream(makeRequest, {
return partialResultStream(this._wrapWithIdWaiter(makeRequest), {
json,
jsonOptions,
maxResumeRetries,
});
})
.on('response', response => {
if (response.metadata && response.metadata!.transaction && !this.id) {
this._update(response.metadata!.transaction);
}
})
.on('error', () => {
if (!this.id && this._useInRunner) {
this.begin();
}
});
}

/**
Expand Down Expand Up @@ -1226,6 +1254,51 @@ export class Snapshot extends EventEmitter {

return {params, paramTypes};
}

/**
* Update transaction properties from the response.
*
* @private
*
* @param {spannerClient.spanner.v1.ITransaction} resp Response object.
*/
protected _update(resp: spannerClient.spanner.v1.ITransaction): void {
const {id, readTimestamp} = resp;

this.id = id!;
this.metadata = resp;

if (readTimestamp) {
this.readTimestampProto = readTimestamp;
this.readTimestamp = new PreciseDate(readTimestamp as DateStruct);
}
this._idWaiter.emit('notify');
}

/**
* Wrap `makeRequest` function with the lock to make sure the inline begin
* transaction can happen only once.
*
* @param makeRequest
* @private
*/
private _wrapWithIdWaiter(
makeRequest: (resumeToken?: ResumeToken) => Readable
): (resumeToken?: ResumeToken) => Readable {
if (this.id || !this._options.readWrite) {
return makeRequest;
}
if (!this._inlineBeginStarted) {
this._inlineBeginStarted = true;
return makeRequest;
}
return (resumeToken?: ResumeToken): Readable =>
this._idWaiter.once('notify', () =>
makeRequest(resumeToken)
.on('data', chunk => this._idWaiter.emit('data', chunk))
.once('end', () => this._idWaiter.emit('end'))
);
}
}

/*! Developer Documentation
Expand Down Expand Up @@ -1528,14 +1601,20 @@ export class Transaction extends Dml {
return {sql, params, paramTypes};
});

const transaction: spannerClient.spanner.v1.ITransactionSelector = {};
if (this.id) {
transaction.id = this.id as Uint8Array;
} else {
transaction.begin = this._options;
}
const reqOpts: spannerClient.spanner.v1.ExecuteBatchDmlRequest = {
session: this.session.formattedName_!,
requestOptions: this.configureTagOptions(
false,
this.requestOptions?.transactionTag ?? undefined,
(options as BatchUpdateOptions).requestOptions
),
transaction: {id: this.id!},
transaction,
seqno: this._seqno++,
statements,
} as spannerClient.spanner.v1.ExecuteBatchDmlRequest;
Expand All @@ -1562,6 +1641,11 @@ export class Transaction extends Dml {
}

const {resultSets, status} = resp;
for (const resultSet of resultSets) {
if (!this.id && resultSet.metadata?.transaction) {
this._update(resultSet.metadata.transaction);
}
}
const rowCounts: number[] = resultSets.map(({stats}) => {
return (
(stats &&
Expand Down Expand Up @@ -1686,8 +1770,11 @@ export class Transaction extends Dml {

if (this.id) {
reqOpts.transactionId = this.id as Uint8Array;
} else {
} else if (!this._useInRunner) {
reqOpts.singleUseTransaction = this._options;
} else {
this.begin().then(() => this.commit(options, callback));
return;
}

if (
Expand Down Expand Up @@ -2184,6 +2271,13 @@ export class Transaction extends Dml {
const unique = new Set(allKeys);
return Array.from(unique).sort();
}

/**
* Mark transaction as started from the runner.
*/
useInRunner(): void {
this._useInRunner = true;
}
}

/*! Developer Documentation
Expand Down
9 changes: 5 additions & 4 deletions system-test/spanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6799,10 +6799,11 @@ describe('Spanner', () => {
NumberValue: 0,
};

beforeEach(() => {
return googleSqlTable.update(defaultRowValues).then(() => {
postgreSqlTable.update(defaultRowValues);
});
beforeEach(async () => {
await googleSqlTable.update(defaultRowValues);
if (!IS_EMULATOR_ENABLED) {
await postgreSqlTable.update(defaultRowValues);
}
});

const readConcurrentTransaction = (done, database, table) => {
Expand Down
10 changes: 0 additions & 10 deletions test/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,6 @@ class FakeSession {
}
}

interface ReadSessionCallback {
(err: Error, session?: null): void;
(err: null, session: FakeSession): void;
}

interface WriteSessionCallback {
(err: Error, session?: null, transaction?: null): void;
(err: null, session: FakeSession, transaction: FakeTransaction): void;
}

class FakeSessionPool extends EventEmitter {
calledWith_: IArguments;
constructor() {
Expand Down
Loading

0 comments on commit d1b95d2

Please sign in to comment.