Skip to content

Commit

Permalink
refactor: tracing using OpenTelemetry. (#2085)
Browse files Browse the repository at this point in the history
* feat: tracing using OpenTelemetry.

* Fix typos: traceProvider -> tracerProvider.

* address feedback.

* Address feedback.

* Address feedback.

* Add the missing caret (`^`) for new deps.
  • Loading branch information
ehsannas authored Jul 29, 2024
1 parent f73e28b commit e326e41
Show file tree
Hide file tree
Showing 18 changed files with 1,665 additions and 319 deletions.
4 changes: 2 additions & 2 deletions .idea/runConfigurations/System_Test.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

106 changes: 59 additions & 47 deletions dev/src/bulk-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ import {StatusCode} from './status-code';
// eslint-disable-next-line no-undef
import GrpcStatus = FirebaseFirestore.GrpcStatus;
import api = google.firestore.v1;
import {
ATTRIBUTE_KEY_DOC_COUNT,
SPAN_NAME_BULK_WRITER_COMMIT,
} from './telemetry/trace-util';

/*!
* The maximum number of writes that can be in a single batch.
Expand Down Expand Up @@ -243,55 +247,63 @@ class BulkCommitBatch extends WriteBatch {
}

async bulkCommit(options: {requestTag?: string} = {}): Promise<void> {
const tag = options?.requestTag ?? requestTag();

// Capture the error stack to preserve stack tracing across async calls.
const stack = Error().stack!;

let response: api.IBatchWriteResponse;
try {
logger(
'BulkCommitBatch.bulkCommit',
tag,
`Sending next batch with ${this._opCount} writes`
);
const retryCodes = getRetryCodes('batchWrite');
response = await this._commit<
api.BatchWriteRequest,
api.BatchWriteResponse
>({retryCodes, methodName: 'batchWrite', requestTag: tag});
} catch (err) {
// Map the failure to each individual write's result.
const ops = Array.from({length: this.pendingOps.length});
response = {
writeResults: ops.map(() => {
return {};
}),
status: ops.map(() => err),
};
}

for (let i = 0; i < (response.writeResults || []).length; ++i) {
// Since delete operations currently do not have write times, use a
// sentinel Timestamp value.
// TODO(b/158502664): Use actual delete timestamp.
const DELETE_TIMESTAMP_SENTINEL = Timestamp.fromMillis(0);

const status = (response.status || [])[i];
if (status.code === StatusCode.OK) {
const updateTime = Timestamp.fromProto(
response.writeResults![i].updateTime || DELETE_TIMESTAMP_SENTINEL
);
this.pendingOps[i].onSuccess(new WriteResult(updateTime));
} else {
const error =
new (require('google-gax/build/src/fallback').GoogleError)(
status.message || undefined
return this._firestore._traceUtil.startActiveSpan(
SPAN_NAME_BULK_WRITER_COMMIT,
async () => {
const tag = options?.requestTag ?? requestTag();

// Capture the error stack to preserve stack tracing across async calls.
const stack = Error().stack!;

let response: api.IBatchWriteResponse;
try {
logger(
'BulkCommitBatch.bulkCommit',
tag,
`Sending next batch with ${this._opCount} writes`
);
error.code = status.code as number;
this.pendingOps[i].onError(wrapError(error, stack));
const retryCodes = getRetryCodes('batchWrite');
response = await this._commit<
api.BatchWriteRequest,
api.BatchWriteResponse
>({retryCodes, methodName: 'batchWrite', requestTag: tag});
} catch (err) {
// Map the failure to each individual write's result.
const ops = Array.from({length: this.pendingOps.length});
response = {
writeResults: ops.map(() => {
return {};
}),
status: ops.map(() => err),
};
}

for (let i = 0; i < (response.writeResults || []).length; ++i) {
// Since delete operations currently do not have write times, use a
// sentinel Timestamp value.
// TODO(b/158502664): Use actual delete timestamp.
const DELETE_TIMESTAMP_SENTINEL = Timestamp.fromMillis(0);

const status = (response.status || [])[i];
if (status.code === StatusCode.OK) {
const updateTime = Timestamp.fromProto(
response.writeResults![i].updateTime || DELETE_TIMESTAMP_SENTINEL
);
this.pendingOps[i].onSuccess(new WriteResult(updateTime));
} else {
const error =
new (require('google-gax/build/src/fallback').GoogleError)(
status.message || undefined
);
error.code = status.code as number;
this.pendingOps[i].onError(wrapError(error, stack));
}
}
},
{
[ATTRIBUTE_KEY_DOC_COUNT]: this._opCount,
}
}
);
}

/**
Expand Down
80 changes: 43 additions & 37 deletions dev/src/collection-group.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {validateInteger} from './validate';
import api = protos.google.firestore.v1;
import {defaultConverter} from './types';
import {compareArrays} from './order';
import {SPAN_NAME_PARTITION_QUERY} from './telemetry/trace-util';

/**
* A `CollectionGroup` refers to all documents that are contained in a
Expand Down Expand Up @@ -81,48 +82,53 @@ export class CollectionGroup<
async *getPartitions(
desiredPartitionCount: number
): AsyncIterable<QueryPartition<AppModelType, DbModelType>> {
validateInteger('desiredPartitionCount', desiredPartitionCount, {
minValue: 1,
});

const tag = requestTag();
await this.firestore.initializeIfNeeded(tag);

const partitions: Array<api.IValue>[] = [];

if (desiredPartitionCount > 1) {
// Partition queries require explicit ordering by __name__.
const queryWithDefaultOrder = this.orderBy(FieldPath.documentId());
const request: api.IPartitionQueryRequest =
queryWithDefaultOrder.toProto();

// Since we are always returning an extra partition (with an empty endBefore
// cursor), we reduce the desired partition count by one.
request.partitionCount = desiredPartitionCount - 1;

const stream = await this.firestore.requestStream(
'partitionQueryStream',
/* bidirectional= */ false,
request,
tag
);
stream.resume();

for await (const currentCursor of stream) {
partitions.push(currentCursor.values ?? []);
await this._firestore._traceUtil.startActiveSpan(
SPAN_NAME_PARTITION_QUERY,
async () => {
validateInteger('desiredPartitionCount', desiredPartitionCount, {
minValue: 1,
});

const tag = requestTag();
await this.firestore.initializeIfNeeded(tag);

if (desiredPartitionCount > 1) {
// Partition queries require explicit ordering by __name__.
const queryWithDefaultOrder = this.orderBy(FieldPath.documentId());
const request: api.IPartitionQueryRequest =
queryWithDefaultOrder.toProto();

// Since we are always returning an extra partition (with an empty endBefore
// cursor), we reduce the desired partition count by one.
request.partitionCount = desiredPartitionCount - 1;

const stream = await this.firestore.requestStream(
'partitionQueryStream',
/* bidirectional= */ false,
request,
tag
);
stream.resume();

for await (const currentCursor of stream) {
partitions.push(currentCursor.values ?? []);
}
}

logger(
'Firestore.getPartitions',
tag,
'Received %d partitions',
partitions.length
);

// Sort the partitions as they may not be ordered if responses are paged.
partitions.sort((l, r) => compareArrays(l, r));
}
}

logger(
'Firestore.getPartitions',
tag,
'Received %d partitions',
partitions.length
);

// Sort the partitions as they may not be ordered if responses are paged.
partitions.sort((l, r) => compareArrays(l, r));

for (let i = 0; i < partitions.length; ++i) {
yield new QueryPartition(
this._firestore,
Expand Down
Loading

0 comments on commit e326e41

Please sign in to comment.