Skip to content

Commit

Permalink
feat(observability): fix bugs found from product review + negative cases
Browse files Browse the repository at this point in the history
This change adds recording of retry span annotations,
catching cases in which exceptions where thrown but spans
were not ended while testing out and visually confirming the results.
  • Loading branch information
odeke-em committed Oct 12, 2024
1 parent 0342e74 commit 62bf905
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 78 deletions.
117 changes: 68 additions & 49 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2815,6 +2815,10 @@ class Database extends common.GrpcServiceObject {
this.runStream(query, options)
.on('error', err => {
setSpanError(span, err);
console.log(
`\x1b[34mDatabase.run.error: ${err} isRecording: ${span.isRecording()}\x1b[00m`
);
span.end();
callback!(err as grpc.ServiceError, rows, stats, metadata);
})
.on('response', response => {
Expand Down Expand Up @@ -3060,7 +3064,7 @@ class Database extends common.GrpcServiceObject {
dataStream
.once('data', () => (dataReceived = true))
.once('error', err => {
setSpanError(span, err);
setSpanErrorAndException(span, err as Error);

if (
!dataReceived &&
Expand Down Expand Up @@ -3222,8 +3226,8 @@ class Database extends common.GrpcServiceObject {
span.addEvent('No session available', {
'session.id': session?.id,
});
this.runTransaction(options, runFn!);
span.end();
this.runTransaction(options, runFn!);
return;
}

Expand All @@ -3242,8 +3246,8 @@ class Database extends common.GrpcServiceObject {
}

const release = () => {
span.end();
this.pool_.release(session!);
span.end();
};

const runner = new TransactionRunner(
Expand All @@ -3253,28 +3257,34 @@ class Database extends common.GrpcServiceObject {
if (err) {
setSpanError(span, err!);
}
span.end();
runFn!(err, resp);
},
options
);

runner.run().then(release, err => {
if (err) {
setSpanError(span, err!);
}
setSpanError(span, err);

if (isSessionNotFoundError(err)) {
span.addEvent('No session available', {
'session.id': session?.id,
});
span.addEvent('Retrying');
release();
this.runTransaction(options, runFn!);
this.runTransaction(
options,
(
err: ServiceError | null,
txn: Transaction | null | undefined
) => {
runFn!(err, txn);
}
);
} else {
if (!err) {
span.addEvent('Using Session', {'session.id': session!.id});
}
setImmediate(runFn!, err);
span.addEvent('Using Session', {'session.id': session!.id});
setImmediate((err: null | ServiceError) => {
runFn!(err);
}, err);
release();
}
});
Expand Down Expand Up @@ -3363,46 +3373,55 @@ class Database extends common.GrpcServiceObject {

let sessionId = '';
const getSession = this.pool_.getSession.bind(this.pool_);
const span = getActiveOrNoopSpan();
// Loop to retry 'Session not found' errors.
// (and yes, we like while (true) more than for (;;) here)
// eslint-disable-next-line no-constant-condition
while (true) {
try {
const [session, transaction] = await promisify(getSession)();
transaction.requestOptions = Object.assign(
transaction.requestOptions || {},
options.requestOptions
);
if (options.optimisticLock) {
transaction.useOptimisticLock();
}
if (options.excludeTxnFromChangeStreams) {
transaction.excludeTxnFromChangeStreams();
}
sessionId = session?.id;
span.addEvent('Using Session', {'session.id': sessionId});
const runner = new AsyncTransactionRunner<T>(
session,
transaction,
runFn,
options
);

try {
return await runner.run();
} finally {
this.pool_.release(session);
}
} catch (e) {
if (!isSessionNotFoundError(e as ServiceError)) {
span.addEvent('No session available', {
'session.id': sessionId,
});
throw e;
return startTrace(
'Database.runTransactionAsync',
this._traceConfig,
span => {
// Loop to retry 'Session not found' errors.
// (and yes, we like while (true) more than for (;;) here)
// eslint-disable-next-line no-constant-condition
while (true) {
try {
const [session, transaction] = await promisify(getSession)();
transaction.requestOptions = Object.assign(
transaction.requestOptions || {},
options.requestOptions
);
if (options.optimisticLock) {
transaction.useOptimisticLock();
}
if (options.excludeTxnFromChangeStreams) {
transaction.excludeTxnFromChangeStreams();
}
sessionId = session?.id;
span.addEvent('Using Session', {'session.id': sessionId});
const runner = new AsyncTransactionRunner<T>(
session,
transaction,
runFn,
options
);

try {
const result = await runner.run();
span.end();
return result;
} finally {
this.pool_.release(session);
}
} catch (e) {
if (!isSessionNotFoundError(e as ServiceError)) {
span.addEvent('No session available', {
'session.id': sessionId,
});
setSpanErrorAndException(span, e as Error);
throw e;
}
}
}
}
}
);
}

/**
Expand Down
36 changes: 31 additions & 5 deletions src/instrument.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ function ensureInitialContextManagerSet() {
}
}

const debugTraces = process.env.SPANNER_DEBUG_TRACES === 'true';

/**
* startTrace begins an active span in the current active context
* and passes it back to the set callback function. Each span will
Expand All @@ -142,6 +144,10 @@ export function startTrace<T>(
SPAN_NAMESPACE_PREFIX + '.' + spanNameSuffix,
{kind: SpanKind.CLIENT},
span => {
if (debugTraces) {
patchSpanEndForDebugging(span);
}

span.setAttribute(SEMATTRS_DB_SYSTEM, 'spanner');
span.setAttribute(ATTR_OTEL_SCOPE_NAME, TRACER_NAME);
span.setAttribute(ATTR_OTEL_SCOPE_VERSION, TRACER_VERSION);
Expand All @@ -165,11 +171,20 @@ export function startTrace<T>(
}
}

if (config.that) {
const fn = cb.bind(config.that);
return fn(span);
} else {
return cb(span);
// If at all the invoked function throws an exception,
// record the exception and then end this span.
try {
if (config.that) {
const fn = cb.bind(config.that);
return fn(span);
} else {
return cb(span);
}
} catch (e) {
setSpanErrorAndException(span, e as Error);
span.end();
// Finally re-throw the exception.
throw e;
}
}
);
Expand Down Expand Up @@ -289,3 +304,14 @@ class noopSpan implements Span {
return this;
}
}

function patchSpanEndForDebugging(span: Span) {
const origSpanEnd = span.end;
const wrapSpanEnd = function (this: Span) {
console.trace(`\x1b[35m${spanNameSuffix}.end()\x1b[00m`);
return origSpanEnd.apply(this);
};
Object.defineProperty(span, 'end', {
value: wrapSpanEnd,
});
}
34 changes: 28 additions & 6 deletions src/transaction-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {isSessionNotFoundError} from './session-pool';
import {Database} from './database';
import {google} from '../protos/protos';
import IRequestOptions = google.spanner.v1.IRequestOptions;
import {getActiveOrNoopSpan, setSpanErrorAndException} from './instrument';

// eslint-disable-next-line @typescript-eslint/no-var-requires
const jsonProtos = require('../protos/protos.json');
Expand Down Expand Up @@ -224,6 +225,7 @@ export abstract class Runner<T> {
async run(): Promise<T> {
const start = Date.now();
const timeout = this.options.timeout!;
const span = getActiveOrNoopSpan();

let lastError: grpc.ServiceError;

Expand All @@ -233,8 +235,18 @@ export abstract class Runner<T> {
const transaction = await this.getTransaction();

try {
return await this._run(transaction);
const result = await this._run(transaction);
if (this.attempts > 0) {
// No add to annotate if the transaction wasn't retried.
span.addEvent('Transaction Attempt Succeeded', {
attempt: this.attempts + 1,
});
}
return result;
} catch (e) {
span.addEvent('Transaction Attempt Failed', {
attempt: this.attempts + 1,
});
this.session.lastError = e as grpc.ServiceError;
lastError = e as grpc.ServiceError;
}
Expand All @@ -243,19 +255,29 @@ export abstract class Runner<T> {
// thrown here. We do this to bubble this error up to the caller who is
// responsible for retrying the transaction on a different session.
if (
!RETRYABLE.includes(lastError.code!) &&
!isRetryableInternalError(lastError)
!RETRYABLE.includes(lastError!.code!) &&
!isRetryableInternalError(lastError!)
) {
throw lastError;
span.addEvent('Transaction Attempt Aborted', {
attempt: this.attempts + 1,
});
setSpanErrorAndException(span, lastError!);
throw lastError!;
}

this.attempts += 1;

const delay = this.getNextDelay(lastError);
const delay = this.getNextDelay(lastError!);
span.addEvent('Backing off', {delay: delay, attempt: this.attempts});
await new Promise(resolve => setTimeout(resolve, delay));
}

throw new DeadlineError(lastError!);
span.addEvent('Transaction Attempt Aborted due to Deadline Error', {
total_attempts: this.attempts + 1,
});
const err = new DeadlineError(lastError!);
setSpanErrorAndException(span, err);
throw err;
}
}

Expand Down
37 changes: 19 additions & 18 deletions src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -757,12 +757,6 @@ export class Snapshot extends EventEmitter {
this.begin();
}
setSpanError(span, err);
})
.on('end', err => {
if (err) {
setSpanError(span, err);
}
span.end();
});

if (resultStream instanceof Stream) {
Expand Down Expand Up @@ -1288,7 +1282,6 @@ export class Snapshot extends EventEmitter {
} catch (e) {
const errorStream = new PassThrough();
setSpanErrorAndException(span, e as Error);
span.end();
setImmediate(() => errorStream.destroy(e as Error));
return errorStream;
}
Expand Down Expand Up @@ -1332,12 +1325,6 @@ export class Snapshot extends EventEmitter {
) {
this.begin();
}
})
.on('end', err => {
if (err) {
setSpanError(span, err as Error);
}
span.end();
});

if (resultStream instanceof Stream) {
Expand Down Expand Up @@ -2112,15 +2099,29 @@ export class Transaction extends Dml {
} else if (!this._useInRunner) {
reqOpts.singleUseTransaction = this._options;
} else {
this.begin().then(() => {
this.commit(options, (err, resp) => {
if (err) {
this.begin()
.then(
() => {
this.commit(options, (err, resp) => {
if (err) {
setSpanError(span, err);
}
span.end();
callback(err, resp);
});
},
err => {
setSpanError(span, err);
callback(err);
span.end();
}
)
.catch(err => {
setSpanErrorAndException(span, err as Error);
span.end();
callback(err, resp);
// Re-throw the exception after recording it.
throw err;
});
}, callback);
return;
}

Expand Down

0 comments on commit 62bf905

Please sign in to comment.