Skip to content

Commit c5936c5

Browse files
authored
Supporting streaming API calls to callable functions (#1629)
* Fix tests. * Fix tests. * Formatting. * Allow v1 to return streaming response. * Fix lint error. * Add changelog. * Fix lint errors more. * Fix lint error. * Standardize version string from v{n} to gcfv{n}. * Fix type issues in test files.
1 parent 44a93e9 commit c5936c5

File tree

8 files changed

+153
-45
lines changed

8 files changed

+153
-45
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
- Add support for callable function to return streaming response (#1629)

spec/common/providers/https.spec.ts

Lines changed: 74 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -49,25 +49,33 @@ async function runCallableTest(test: CallTest): Promise<any> {
4949
cors: { origin: true, methods: "POST" },
5050
...test.callableOption,
5151
};
52-
const callableFunctionV1 = https.onCallHandler(opts, (data, context) => {
53-
expect(data).to.deep.equal(test.expectedData);
54-
return test.callableFunction(data, context);
55-
});
52+
const callableFunctionV1 = https.onCallHandler(
53+
opts,
54+
(data, context) => {
55+
expect(data).to.deep.equal(test.expectedData);
56+
return test.callableFunction(data, context);
57+
},
58+
"gcfv1"
59+
);
5660

5761
const responseV1 = await runHandler(callableFunctionV1, test.httpRequest);
5862

59-
expect(responseV1.body).to.deep.equal(test.expectedHttpResponse.body);
63+
expect(responseV1.body).to.deep.equal(JSON.stringify(test.expectedHttpResponse.body));
6064
expect(responseV1.headers).to.deep.equal(test.expectedHttpResponse.headers);
6165
expect(responseV1.status).to.equal(test.expectedHttpResponse.status);
6266

63-
const callableFunctionV2 = https.onCallHandler(opts, (request) => {
64-
expect(request.data).to.deep.equal(test.expectedData);
65-
return test.callableFunction2(request);
66-
});
67+
const callableFunctionV2 = https.onCallHandler(
68+
opts,
69+
(request) => {
70+
expect(request.data).to.deep.equal(test.expectedData);
71+
return test.callableFunction2(request);
72+
},
73+
"gcfv2"
74+
);
6775

6876
const responseV2 = await runHandler(callableFunctionV2, test.httpRequest);
6977

70-
expect(responseV2.body).to.deep.equal(test.expectedHttpResponse.body);
78+
expect(responseV2.body).to.deep.equal(JSON.stringify(test.expectedHttpResponse.body));
7179
expect(responseV2.headers).to.deep.equal(test.expectedHttpResponse.headers);
7280
expect(responseV2.status).to.equal(test.expectedHttpResponse.status);
7381
}
@@ -165,7 +173,7 @@ describe("onCallHandler", () => {
165173
status: 400,
166174
headers: expectedResponseHeaders,
167175
body: {
168-
error: { status: "INVALID_ARGUMENT", message: "Bad Request" },
176+
error: { message: "Bad Request", status: "INVALID_ARGUMENT" },
169177
},
170178
},
171179
});
@@ -203,7 +211,7 @@ describe("onCallHandler", () => {
203211
status: 400,
204212
headers: expectedResponseHeaders,
205213
body: {
206-
error: { status: "INVALID_ARGUMENT", message: "Bad Request" },
214+
error: { message: "Bad Request", status: "INVALID_ARGUMENT" },
207215
},
208216
},
209217
});
@@ -225,7 +233,7 @@ describe("onCallHandler", () => {
225233
status: 400,
226234
headers: expectedResponseHeaders,
227235
body: {
228-
error: { status: "INVALID_ARGUMENT", message: "Bad Request" },
236+
error: { message: "Bad Request", status: "INVALID_ARGUMENT" },
229237
},
230238
},
231239
});
@@ -244,7 +252,7 @@ describe("onCallHandler", () => {
244252
expectedHttpResponse: {
245253
status: 500,
246254
headers: expectedResponseHeaders,
247-
body: { error: { status: "INTERNAL", message: "INTERNAL" } },
255+
body: { error: { message: "INTERNAL", status: "INTERNAL" } },
248256
},
249257
});
250258
});
@@ -262,7 +270,7 @@ describe("onCallHandler", () => {
262270
expectedHttpResponse: {
263271
status: 500,
264272
headers: expectedResponseHeaders,
265-
body: { error: { status: "INTERNAL", message: "INTERNAL" } },
273+
body: { error: { message: "INTERNAL", status: "INTERNAL" } },
266274
},
267275
});
268276
});
@@ -280,7 +288,7 @@ describe("onCallHandler", () => {
280288
expectedHttpResponse: {
281289
status: 404,
282290
headers: expectedResponseHeaders,
283-
body: { error: { status: "NOT_FOUND", message: "i am error" } },
291+
body: { error: { message: "i am error", status: "NOT_FOUND" } },
284292
},
285293
});
286294
});
@@ -364,8 +372,8 @@ describe("onCallHandler", () => {
364372
headers: expectedResponseHeaders,
365373
body: {
366374
error: {
367-
status: "UNAUTHENTICATED",
368375
message: "Unauthenticated",
376+
status: "UNAUTHENTICATED",
369377
},
370378
},
371379
},
@@ -391,8 +399,8 @@ describe("onCallHandler", () => {
391399
headers: expectedResponseHeaders,
392400
body: {
393401
error: {
394-
status: "UNAUTHENTICATED",
395402
message: "Unauthenticated",
403+
status: "UNAUTHENTICATED",
396404
},
397405
},
398406
},
@@ -461,8 +469,8 @@ describe("onCallHandler", () => {
461469
headers: expectedResponseHeaders,
462470
body: {
463471
error: {
464-
status: "UNAUTHENTICATED",
465472
message: "Unauthenticated",
473+
status: "UNAUTHENTICATED",
466474
},
467475
},
468476
},
@@ -748,6 +756,53 @@ describe("onCallHandler", () => {
748756
});
749757
});
750758
});
759+
760+
describe("Streaming callables", () => {
761+
it("returns data in SSE format for requests Accept: text/event-stream header", async () => {
762+
const mockReq = mockRequest(
763+
{ message: "hello streaming" },
764+
"application/json",
765+
{},
766+
{ accept: "text/event-stream" }
767+
) as any;
768+
const fn = https.onCallHandler(
769+
{
770+
cors: { origin: true, methods: "POST" },
771+
},
772+
(req, resp) => {
773+
resp.write("hello");
774+
return "world";
775+
},
776+
"gcfv2"
777+
);
778+
779+
const resp = await runHandler(fn, mockReq);
780+
const data = [`data: {"message":"hello"}`, `data: {"result":"world"}`];
781+
expect(resp.body).to.equal([...data, ""].join("\n"));
782+
});
783+
784+
it("returns error in SSE format", async () => {
785+
const mockReq = mockRequest(
786+
{ message: "hello streaming" },
787+
"application/json",
788+
{},
789+
{ accept: "text/event-stream" }
790+
) as any;
791+
const fn = https.onCallHandler(
792+
{
793+
cors: { origin: true, methods: "POST" },
794+
},
795+
() => {
796+
throw new Error("BOOM");
797+
},
798+
"gcfv2"
799+
);
800+
801+
const resp = await runHandler(fn, mockReq);
802+
const data = [`data: {"error":{"message":"INTERNAL","status":"INTERNAL"}}`];
803+
expect(resp.body).to.equal([...data, ""].join("\n"));
804+
});
805+
});
751806
});
752807

753808
describe("encoding/decoding", () => {

spec/helper.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ export function runHandler(
4747
// MockResponse mocks an express.Response.
4848
// This class lives here so it can reference resolve and reject.
4949
class MockResponse {
50+
private sentBody = "";
5051
private statusCode = 0;
5152
private headers: { [name: string]: string } = {};
5253
private callback: () => void;
@@ -65,7 +66,10 @@ export function runHandler(
6566
return this.headers[name];
6667
}
6768

68-
public send(body: any) {
69+
public send(sendBody: any) {
70+
const toSend = typeof sendBody === "object" ? JSON.stringify(sendBody) : sendBody;
71+
const body = this.sentBody ? this.sentBody + ((toSend as string) || "") : toSend;
72+
6973
resolve({
7074
status: this.statusCode,
7175
headers: this.headers,
@@ -76,6 +80,10 @@ export function runHandler(
7680
}
7781
}
7882

83+
public write(writeBody: any) {
84+
this.sentBody += typeof writeBody === "object" ? JSON.stringify(writeBody) : writeBody;
85+
}
86+
7987
public end() {
8088
this.send(undefined);
8189
}

spec/v1/providers/https.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ describe("callable CORS", () => {
276276
const response = await runHandler(func, req as any);
277277

278278
expect(response.status).to.equal(200);
279-
expect(response.body).to.be.deep.equal({ result: 42 });
279+
expect(response.body).to.be.deep.equal(JSON.stringify({ result: 42 }));
280280
expect(response.headers).to.deep.equal(expectedResponseHeaders);
281281
});
282282
});

spec/v2/providers/https.spec.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,7 @@ describe("onCall", () => {
417417
req.method = "POST";
418418

419419
const resp = await runHandler(func, req as any);
420-
expect(resp.body).to.deep.equal({ result: 42 });
420+
expect(resp.body).to.deep.equal(JSON.stringify({ result: 42 }));
421421
});
422422

423423
it("should enforce CORS options", async () => {
@@ -496,7 +496,7 @@ describe("onCall", () => {
496496
const response = await runHandler(func, req as any);
497497

498498
expect(response.status).to.equal(200);
499-
expect(response.body).to.be.deep.equal({ result: 42 });
499+
expect(response.body).to.be.deep.equal(JSON.stringify({ result: 42 }));
500500
expect(response.headers).to.deep.equal(expectedResponseHeaders);
501501
});
502502

src/common/providers/https.ts

Lines changed: 53 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,15 @@ export interface CallableRequest<T = any> {
141141
rawRequest: Request;
142142
}
143143

144+
/**
145+
* CallableProxyResponse exposes subset of express.Response object
146+
* to allow writing partial, streaming responses back to the client.
147+
*/
148+
export interface CallableProxyResponse {
149+
write: express.Response["write"];
150+
acceptsStreaming: boolean;
151+
}
152+
144153
/**
145154
* The set of Firebase Functions status codes. The codes are the same at the
146155
* ones exposed by {@link https://github.com/grpc/grpc/blob/master/doc/statuscodes.md | gRPC}.
@@ -673,7 +682,10 @@ async function checkAppCheckToken(
673682
}
674683

675684
type v1CallableHandler = (data: any, context: CallableContext) => any | Promise<any>;
676-
type v2CallableHandler<Req, Res> = (request: CallableRequest<Req>) => Res;
685+
type v2CallableHandler<Req, Res> = (
686+
request: CallableRequest<Req>,
687+
response?: CallableProxyResponse
688+
) => Res;
677689

678690
/** @internal **/
679691
export interface CallableOptions {
@@ -685,9 +697,10 @@ export interface CallableOptions {
685697
/** @internal */
686698
export function onCallHandler<Req = any, Res = any>(
687699
options: CallableOptions,
688-
handler: v1CallableHandler | v2CallableHandler<Req, Res>
700+
handler: v1CallableHandler | v2CallableHandler<Req, Res>,
701+
version: "gcfv1" | "gcfv2"
689702
): (req: Request, res: express.Response) => Promise<void> {
690-
const wrapped = wrapOnCallHandler(options, handler);
703+
const wrapped = wrapOnCallHandler(options, handler, version);
691704
return (req: Request, res: express.Response) => {
692705
return new Promise((resolve) => {
693706
res.on("finish", resolve);
@@ -698,10 +711,15 @@ export function onCallHandler<Req = any, Res = any>(
698711
};
699712
}
700713

714+
function encodeSSE(data: unknown): string {
715+
return `data: ${JSON.stringify(data)}\n`;
716+
}
717+
701718
/** @internal */
702719
function wrapOnCallHandler<Req = any, Res = any>(
703720
options: CallableOptions,
704-
handler: v1CallableHandler | v2CallableHandler<Req, Res>
721+
handler: v1CallableHandler | v2CallableHandler<Req, Res>,
722+
version: "gcfv1" | "gcfv2"
705723
): (req: Request, res: express.Response) => Promise<void> {
706724
return async (req: Request, res: express.Response): Promise<void> => {
707725
try {
@@ -719,7 +737,7 @@ function wrapOnCallHandler<Req = any, Res = any>(
719737
// The original monkey-patched code lived in the functionsEmulatorRuntime
720738
// (link: https://github.com/firebase/firebase-tools/blob/accea7abda3cc9fa6bb91368e4895faf95281c60/src/emulator/functionsEmulatorRuntime.ts#L480)
721739
// and was not compatible with how monorepos separate out packages (see https://github.com/firebase/firebase-tools/issues/5210).
722-
if (isDebugFeatureEnabled("skipTokenVerification") && handler.length === 2) {
740+
if (isDebugFeatureEnabled("skipTokenVerification") && version === "gcfv1") {
723741
const authContext = context.rawRequest.header(CALLABLE_AUTH_HEADER);
724742
if (authContext) {
725743
logger.debug("Callable functions auth override", {
@@ -763,26 +781,47 @@ function wrapOnCallHandler<Req = any, Res = any>(
763781
context.instanceIdToken = req.header("Firebase-Instance-ID-Token");
764782
}
765783

784+
const acceptsStreaming = req.header("accept") === "text/event-stream";
766785
const data: Req = decode(req.body.data);
767786
let result: Res;
768-
if (handler.length === 2) {
769-
result = await handler(data, context);
787+
if (version === "gcfv1") {
788+
result = await (handler as v1CallableHandler)(data, context);
770789
} else {
771790
const arg: CallableRequest<Req> = {
772791
...context,
773792
data,
774793
};
794+
// TODO: set up optional heartbeat
795+
const responseProxy: CallableProxyResponse = {
796+
write(chunk): boolean {
797+
if (acceptsStreaming) {
798+
const formattedData = encodeSSE({ message: chunk });
799+
return res.write(formattedData);
800+
}
801+
// if client doesn't accept sse-protocol, response.write() is no-op.
802+
},
803+
acceptsStreaming,
804+
};
805+
if (acceptsStreaming) {
806+
// SSE always responds with 200
807+
res.status(200);
808+
}
775809
// For some reason the type system isn't picking up that the handler
776810
// is a one argument function.
777-
result = await (handler as any)(arg);
811+
result = await (handler as any)(arg, responseProxy);
778812
}
779813

780814
// Encode the result as JSON to preserve types like Dates.
781815
result = encode(result);
782816

783817
// If there was some result, encode it in the body.
784818
const responseBody: HttpResponseBody = { result };
785-
res.status(200).send(responseBody);
819+
if (acceptsStreaming) {
820+
res.write(encodeSSE(responseBody));
821+
res.end();
822+
} else {
823+
res.status(200).send(responseBody);
824+
}
786825
} catch (err) {
787826
let httpErr = err;
788827
if (!(err instanceof HttpsError)) {
@@ -793,8 +832,11 @@ function wrapOnCallHandler<Req = any, Res = any>(
793832

794833
const { status } = httpErr.httpErrorCode;
795834
const body = { error: httpErr.toJSON() };
796-
797-
res.status(status).send(body);
835+
if (req.header("accept") === "text/event-stream") {
836+
res.send(encodeSSE(body));
837+
} else {
838+
res.status(status).send(body);
839+
}
798840
}
799841
};
800842
}

src/v1/providers/https.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,8 @@ export function _onCallWithOptions(
102102
handler: (data: any, context: CallableContext) => any | Promise<any>,
103103
options: DeploymentOptions
104104
): HttpsFunction & Runnable<any> {
105-
// onCallHandler sniffs the function length of the passed-in callback
106-
// and the user could have only tried to listen to data. Wrap their handler
107-
// in another handler to avoid accidentally triggering the v2 API
105+
// fix the length of handler to make the call to handler consistent
106+
// in the onCallHandler
108107
const fixedLen = (data: any, context: CallableContext) => {
109108
return withInit(handler)(data, context);
110109
};
@@ -115,7 +114,8 @@ export function _onCallWithOptions(
115114
consumeAppCheckToken: options.consumeAppCheckToken,
116115
cors: { origin: true, methods: "POST" },
117116
},
118-
fixedLen
117+
fixedLen,
118+
"gcfv1"
119119
)
120120
);
121121

0 commit comments

Comments
 (0)