Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions packages/firestore/src/platform/node/grpc_connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,11 @@ export class GrpcConnection implements Connection {
path: string,
request: Req,
authToken: Token | null,
appCheckToken: Token | null
appCheckToken: Token | null,
expectedResponseCount?: number
): Promise<Resp[]> {
const results: Resp[] = [];
const responseDeferred = new Deferred<Resp[]>();

logDebug(
LOG_TAG,
`RPC '${rpcName}' invoked (streaming) with request:`,
Expand All @@ -172,13 +172,24 @@ export class GrpcConnection implements Connection {
);
const jsonRequest = { ...request, database: this.databasePath };
const stream = stub[rpcName](jsonRequest, metadata);
let callbackFired = false;
stream.on('data', (response: Resp) => {
logDebug(LOG_TAG, `RPC ${rpcName} received result:`, response);
results.push(response);
if (
expectedResponseCount !== undefined &&
results.length === expectedResponseCount
) {
callbackFired = true;
responseDeferred.resolve(results);
}
});
stream.on('end', () => {
logDebug(LOG_TAG, `RPC '${rpcName}' completed.`);
responseDeferred.resolve(results);
if (!callbackFired) {
callbackFired = true;
responseDeferred.resolve(results);
}
});
stream.on('error', (grpcError: grpc.ServiceError) => {
logDebug(LOG_TAG, `RPC '${rpcName}' failed with error:`, grpcError);
Expand Down
3 changes: 2 additions & 1 deletion packages/firestore/src/remote/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ export interface Connection {
path: string,
request: Req,
authToken: Token | null,
appCheckToken: Token | null
appCheckToken: Token | null,
expectedResponseCount?: number
): Promise<Resp[]>;

/**
Expand Down
8 changes: 5 additions & 3 deletions packages/firestore/src/remote/datastore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ class DatastoreImpl extends Datastore {
invokeStreamingRPC<Req, Resp>(
rpcName: string,
path: string,
request: Req
request: Req,
expectedResponseCount?: number
): Promise<Resp[]> {
this.verifyInitialized();
return Promise.all([
Expand All @@ -133,7 +134,8 @@ class DatastoreImpl extends Datastore {
path,
request,
authToken,
appCheckToken
appCheckToken,
expectedResponseCount
);
})
.catch((error: FirestoreError) => {
Expand Down Expand Up @@ -194,7 +196,7 @@ export async function invokeBatchGetDocumentsRpc(
const response = await datastoreImpl.invokeStreamingRPC<
ProtoBatchGetDocumentsRequest,
ProtoBatchGetDocumentsResponse
>('BatchGetDocuments', path, request);
>('BatchGetDocuments', path, request, keys.length);

const docs = new Map<string, Document>();
response.forEach(proto => {
Expand Down
3 changes: 2 additions & 1 deletion packages/firestore/src/remote/rest_connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ export abstract class RestConnection implements Connection {
path: string,
request: Req,
authToken: Token | null,
appCheckToken: Token | null
appCheckToken: Token | null,
expectedResponseCount?: number
): Promise<Resp[]> {
// The REST API automatically aggregates all of the streamed results, so we
// can just use the normal invoke() method.
Expand Down