Skip to content

Commit 2a19ef1

Browse files
authored
feat(observability): trace Database.batchWriteAtLeastOnce (#2157)
This change traces Database.batchWriteAtLeastOnce. Sadly though MockSpanner doesn't yet support batch writes hence no end-to-end test with it. Updates #2079
1 parent f01516e commit 2a19ef1

File tree

2 files changed

+295
-59
lines changed

2 files changed

+295
-59
lines changed

observability-test/database.ts

Lines changed: 225 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,12 @@ import {EventEmitter} from 'events';
2121
import * as assert from 'assert';
2222
import * as extend from 'extend';
2323
import {google} from '../protos/protos';
24-
import {CommitCallback, CommitOptions, MutationSet} from '../src/transaction';
24+
import {
25+
BatchWriteOptions,
26+
CommitCallback,
27+
CommitOptions,
28+
MutationSet,
29+
} from '../src/transaction';
2530
import {util} from '@google-cloud/common';
2631
import {Transform} from 'stream';
2732
import * as proxyquire from 'proxyquire';
@@ -35,7 +40,7 @@ const {
3540
// eslint-disable-next-line n/no-extraneous-require
3641
const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base');
3742
import * as db from '../src/database';
38-
import {Instance, Spanner} from '../src';
43+
import {Instance, MutationGroup, Spanner} from '../src';
3944
import * as pfy from '@google-cloud/promisify';
4045
import {grpc} from 'google-gax';
4146
import {MockError} from '../test/mockserver/mockspanner';
@@ -1215,6 +1220,224 @@ describe('Database', () => {
12151220
});
12161221
});
12171222

1223+
describe('batchWriteAtLeastOnce', () => {
1224+
const mutationGroup1 = new MutationGroup();
1225+
mutationGroup1.insert('MyTable', {
1226+
Key: 'ks1',
1227+
Thing: 'abc',
1228+
});
1229+
const mutationGroup2 = new MutationGroup();
1230+
mutationGroup2.insert('MyTable', {
1231+
Key: 'ks2',
1232+
Thing: 'xyz',
1233+
});
1234+
1235+
const mutationGroups = [mutationGroup1, mutationGroup2];
1236+
1237+
let fakePool: FakeSessionPool;
1238+
let fakeSession: FakeSession;
1239+
let fakeDataStream: Transform;
1240+
let getSessionStub: sinon.SinonStub;
1241+
let requestStreamStub: sinon.SinonStub;
1242+
1243+
const options = {
1244+
requestOptions: {
1245+
transactionTag: 'batch-write-tag',
1246+
},
1247+
excludeTxnFromChangeStream: true,
1248+
gaxOptions: {autoPaginate: false},
1249+
} as BatchWriteOptions;
1250+
1251+
beforeEach(() => {
1252+
fakePool = database.pool_;
1253+
fakeSession = new FakeSession();
1254+
fakeDataStream = through.obj();
1255+
1256+
getSessionStub = (
1257+
sandbox.stub(fakePool, 'getSession') as sinon.SinonStub
1258+
).callsFake(callback => callback(null, fakeSession));
1259+
1260+
requestStreamStub = sandbox
1261+
.stub(database, 'requestStream')
1262+
.returns(fakeDataStream);
1263+
});
1264+
1265+
it('on retry with "Session not found" error', done => {
1266+
const sessionNotFoundError = {
1267+
code: grpc.status.NOT_FOUND,
1268+
message: 'Session not found',
1269+
} as grpc.ServiceError;
1270+
let retryCount = 0;
1271+
1272+
database
1273+
.batchWriteAtLeastOnce(mutationGroups, options)
1274+
.on('data', () => {})
1275+
.on('error', err => {
1276+
assert.fail(err);
1277+
})
1278+
.on('end', () => {
1279+
assert.strictEqual(retryCount, 1);
1280+
1281+
const spans = traceExporter.getFinishedSpans();
1282+
withAllSpansHaveDBName(spans);
1283+
1284+
const actualSpanNames: string[] = [];
1285+
const actualEventNames: string[] = [];
1286+
spans.forEach(span => {
1287+
actualSpanNames.push(span.name);
1288+
span.events.forEach(event => {
1289+
actualEventNames.push(event.name);
1290+
});
1291+
});
1292+
1293+
const expectedSpanNames = [
1294+
'CloudSpanner.Database.batchWriteAtLeastOnce',
1295+
'CloudSpanner.Database.batchWriteAtLeastOnce',
1296+
];
1297+
assert.deepStrictEqual(
1298+
actualSpanNames,
1299+
expectedSpanNames,
1300+
`span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}`
1301+
);
1302+
1303+
// Ensure that the span actually produced an error that was recorded.
1304+
const firstSpan = spans[0];
1305+
assert.strictEqual(
1306+
SpanStatusCode.ERROR,
1307+
firstSpan.status.code,
1308+
'Expected an ERROR span status'
1309+
);
1310+
1311+
const errorMessage = firstSpan.status.message;
1312+
assert.deepStrictEqual(
1313+
firstSpan.status.message,
1314+
sessionNotFoundError.message
1315+
);
1316+
1317+
// The last span should not have an error status.
1318+
const lastSpan = spans[spans.length - 1];
1319+
assert.strictEqual(
1320+
SpanStatusCode.UNSET,
1321+
lastSpan.status.code,
1322+
'Unexpected span status'
1323+
);
1324+
1325+
assert.deepStrictEqual(lastSpan.status.message, undefined);
1326+
1327+
const expectedEventNames = [
1328+
'Using Session',
1329+
'No session available',
1330+
'Using Session',
1331+
];
1332+
assert.deepStrictEqual(actualEventNames, expectedEventNames);
1333+
1334+
done();
1335+
});
1336+
1337+
fakeDataStream.emit('error', sessionNotFoundError);
1338+
retryCount++;
1339+
});
1340+
1341+
it('on getSession errors', done => {
1342+
const fakeError = new Error('err');
1343+
1344+
getSessionStub.callsFake(callback => callback(fakeError));
1345+
database
1346+
.batchWriteAtLeastOnce(mutationGroups, options)
1347+
.on('error', err => {
1348+
assert.strictEqual(err, fakeError);
1349+
1350+
const spans = traceExporter.getFinishedSpans();
1351+
withAllSpansHaveDBName(spans);
1352+
1353+
const actualSpanNames: string[] = [];
1354+
const actualEventNames: string[] = [];
1355+
spans.forEach(span => {
1356+
actualSpanNames.push(span.name);
1357+
span.events.forEach(event => {
1358+
actualEventNames.push(event.name);
1359+
});
1360+
});
1361+
1362+
const expectedSpanNames = [
1363+
'CloudSpanner.Database.batchWriteAtLeastOnce',
1364+
];
1365+
assert.deepStrictEqual(
1366+
actualSpanNames,
1367+
expectedSpanNames,
1368+
`span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}`
1369+
);
1370+
1371+
// Ensure that the span actually produced an error that was recorded.
1372+
const firstSpan = spans[0];
1373+
assert.strictEqual(
1374+
SpanStatusCode.ERROR,
1375+
firstSpan.status.code,
1376+
'Expected an ERROR span status'
1377+
);
1378+
1379+
assert.deepStrictEqual(firstSpan.status.message, fakeError.message);
1380+
1381+
const expectedEventNames = [];
1382+
assert.deepStrictEqual(expectedEventNames, actualEventNames);
1383+
1384+
done();
1385+
});
1386+
});
1387+
1388+
it('with no errors', done => {
1389+
getSessionStub.callsFake(callback => callback(null, {}));
1390+
database
1391+
.batchWriteAtLeastOnce(mutationGroups, options)
1392+
.on('data', () => {})
1393+
.on('error', assert.ifError)
1394+
.on('end', () => {
1395+
const spans = traceExporter.getFinishedSpans();
1396+
withAllSpansHaveDBName(spans);
1397+
1398+
const actualSpanNames: string[] = [];
1399+
const actualEventNames: string[] = [];
1400+
spans.forEach(span => {
1401+
actualSpanNames.push(span.name);
1402+
span.events.forEach(event => {
1403+
actualEventNames.push(event.name);
1404+
});
1405+
});
1406+
1407+
const expectedSpanNames = [
1408+
'CloudSpanner.Database.batchWriteAtLeastOnce',
1409+
];
1410+
assert.deepStrictEqual(
1411+
actualSpanNames,
1412+
expectedSpanNames,
1413+
`span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}`
1414+
);
1415+
1416+
// Ensure that the span actually produced an error that was recorded.
1417+
const firstSpan = spans[0];
1418+
assert.strictEqual(
1419+
SpanStatusCode.UNSET,
1420+
firstSpan.status.code,
1421+
'Unexpected span status code'
1422+
);
1423+
1424+
assert.strictEqual(
1425+
undefined,
1426+
firstSpan.status.message,
1427+
'Unexpected span status message'
1428+
);
1429+
1430+
const expectedEventNames = ['Using Session'];
1431+
assert.deepStrictEqual(actualEventNames, expectedEventNames);
1432+
1433+
done();
1434+
});
1435+
1436+
fakeDataStream.emit('data', 'response');
1437+
fakeDataStream.end('end');
1438+
});
1439+
});
1440+
12181441
describe('runTransaction', () => {
12191442
const SESSION = new FakeSession();
12201443
const TRANSACTION = new FakeTransaction(

src/database.ts

Lines changed: 70 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -3469,66 +3469,79 @@ class Database extends common.GrpcServiceObject {
34693469
): NodeJS.ReadableStream {
34703470
const proxyStream: Transform = through.obj();
34713471

3472-
const span = getActiveOrNoopSpan();
3473-
3474-
this.pool_.getSession((err, session) => {
3475-
if (err) {
3476-
proxyStream.destroy(err);
3477-
return;
3478-
}
3479-
3480-
span.addEvent('Using Session', {'session.id': session?.id});
3481-
const gaxOpts = extend(true, {}, options?.gaxOptions);
3482-
const reqOpts = Object.assign(
3483-
{} as spannerClient.spanner.v1.BatchWriteRequest,
3484-
{
3485-
session: session!.formattedName_!,
3486-
mutationGroups: mutationGroups.map(mg => mg.proto()),
3487-
requestOptions: options?.requestOptions,
3488-
excludeTxnFromChangeStream: options?.excludeTxnFromChangeStreams,
3489-
}
3490-
);
3491-
let dataReceived = false;
3492-
let dataStream = this.requestStream({
3493-
client: 'SpannerClient',
3494-
method: 'batchWrite',
3495-
reqOpts,
3496-
gaxOpts,
3497-
headers: this.resourceHeader_,
3498-
});
3499-
dataStream
3500-
.once('data', () => (dataReceived = true))
3501-
.once('error', err => {
3502-
if (
3503-
!dataReceived &&
3504-
isSessionNotFoundError(err as grpc.ServiceError)
3505-
) {
3506-
// If there's a 'Session not found' error and we have not yet received
3507-
// any data, we can safely retry the writes on a new session.
3508-
// Register the error on the session so the pool can discard it.
3509-
if (session) {
3510-
session.lastError = err as grpc.ServiceError;
3511-
}
3512-
span.addEvent('No session available', {
3513-
'session.id': session?.id,
3514-
});
3515-
// Remove the current data stream from the end user stream.
3516-
dataStream.unpipe(proxyStream);
3517-
dataStream.end();
3518-
// Create a new stream and add it to the end user stream.
3519-
dataStream = this.batchWriteAtLeastOnce(mutationGroups, options);
3520-
dataStream.pipe(proxyStream);
3521-
} else {
3472+
return startTrace(
3473+
'Database.batchWriteAtLeastOnce',
3474+
this._traceConfig,
3475+
span => {
3476+
this.pool_.getSession((err, session) => {
3477+
if (err) {
35223478
proxyStream.destroy(err);
3479+
setSpanError(span, err);
3480+
span.end();
3481+
return;
35233482
}
3524-
})
3525-
.once('end', () => {
3526-
this.pool_.release(session!);
3527-
})
3528-
.pipe(proxyStream);
3529-
});
35303483

3531-
return proxyStream as NodeJS.ReadableStream;
3484+
span.addEvent('Using Session', {'session.id': session?.id});
3485+
const gaxOpts = extend(true, {}, options?.gaxOptions);
3486+
const reqOpts = Object.assign(
3487+
{} as spannerClient.spanner.v1.BatchWriteRequest,
3488+
{
3489+
session: session!.formattedName_!,
3490+
mutationGroups: mutationGroups.map(mg => mg.proto()),
3491+
requestOptions: options?.requestOptions,
3492+
excludeTxnFromChangeStream: options?.excludeTxnFromChangeStreams,
3493+
}
3494+
);
3495+
let dataReceived = false;
3496+
let dataStream = this.requestStream({
3497+
client: 'SpannerClient',
3498+
method: 'batchWrite',
3499+
reqOpts,
3500+
gaxOpts,
3501+
headers: this.resourceHeader_,
3502+
});
3503+
dataStream
3504+
.once('data', () => (dataReceived = true))
3505+
.once('error', err => {
3506+
setSpanError(span, err);
3507+
3508+
if (
3509+
!dataReceived &&
3510+
isSessionNotFoundError(err as grpc.ServiceError)
3511+
) {
3512+
// If there's a 'Session not found' error and we have not yet received
3513+
// any data, we can safely retry the writes on a new session.
3514+
// Register the error on the session so the pool can discard it.
3515+
if (session) {
3516+
session.lastError = err as grpc.ServiceError;
3517+
}
3518+
span.addEvent('No session available', {
3519+
'session.id': session?.id,
3520+
});
3521+
// Remove the current data stream from the end user stream.
3522+
dataStream.unpipe(proxyStream);
3523+
dataStream.end();
3524+
// Create a new stream and add it to the end user stream.
3525+
dataStream = this.batchWriteAtLeastOnce(
3526+
mutationGroups,
3527+
options
3528+
);
3529+
dataStream.pipe(proxyStream);
3530+
} else {
3531+
span.end();
3532+
proxyStream.destroy(err);
3533+
}
3534+
})
3535+
.once('end', () => {
3536+
span.end();
3537+
this.pool_.release(session!);
3538+
})
3539+
.pipe(proxyStream);
3540+
});
3541+
3542+
return proxyStream as NodeJS.ReadableStream;
3543+
}
3544+
);
35323545
}
35333546

35343547
/**

0 commit comments

Comments
 (0)