Skip to content

[Functions] Add support for streamable cloud functions #14395

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 114 commits into from
Mar 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
114 commits
Select commit Hold shift + click to select a range
231d602
[Infra] Update functions workflow to use macOS 15 for Xcode 16 jobs (…
eBlender Nov 8, 2024
a14d964
Stremable Functions.
eBlender Dec 20, 2024
a92d7c2
Changed return type.
eBlender Dec 20, 2024
10bec1d
Lint test
eBlender Dec 20, 2024
53a2aab
Remove test function
eBlender Dec 20, 2024
758fbed
Remove old test.
eBlender Dec 20, 2024
93b6c8b
Updated function, add full test.
eBlender Dec 27, 2024
a7e8fe8
Update functions
eBlender Jan 2, 2025
6d59fcd
Update FunctionsTests.swift
eBlender Jan 2, 2025
51f02b8
Cleanup HTTPCallable
eBlender Jan 2, 2025
7b61076
Add documentation for processResponseDataForStreamableContent
eBlender Jan 2, 2025
a95449e
Update Functions.swift
eBlender Jan 2, 2025
cdc49ee
Update Functions.swift
eBlender Jan 2, 2025
426b6bc
Update FunctionsTests.swift
eBlender Jan 2, 2025
9cb0a5e
Update and Cleanup
eBlender Jan 3, 2025
ad31052
Update IntegrationTests.swift
eBlender Jan 3, 2025
6ee9000
Clean up
eBlender Jan 3, 2025
1ffe73d
Update check.sh
eBlender Jan 3, 2025
9fcd91e
Bump to Main.
eBlender Jan 6, 2025
177aa8e
Merge branch 'main' into iOS-Stremable-Functions
eBlender Jan 6, 2025
f4d678b
Cleanup
eBlender Jan 6, 2025
74557e7
Merge branch 'iOS-Stremable-Functions' of https://github.com/eBlender…
eBlender Jan 6, 2025
18f748b
Update Functions.swift
eBlender Jan 7, 2025
4f956fb
Lint check
eBlender Jan 7, 2025
4edc0ad
Function concurrency error
eBlender Jan 7, 2025
e50f69c
Update .github/workflows/functions.yml
eBlender Jan 15, 2025
7356cf9
Update FirebaseFunctions/Tests/Unit/FunctionsTests.swift
eBlender Jan 15, 2025
aed47d6
Delete firebase-database-emulator.log
eBlender Jan 15, 2025
f6c6cff
Delete firebase-database-emulator.pid
eBlender Jan 15, 2025
75a7574
Update function error handling.
eBlender Jan 15, 2025
adf7366
Merge branch 'iOS-Stremable-Functions' of https://github.com/eBlender…
eBlender Jan 15, 2025
9ef7411
Update FirebaseFunctions/Tests/Unit/FunctionsTests.swift
eBlender Jan 16, 2025
4ee820e
Update FunctionsTests.swift
eBlender Jan 16, 2025
fd68f01
Merge branch 'iOS-Stremable-Functions' of https://github.com/eBlender…
eBlender Jan 16, 2025
f27bf07
Update FunctionsTests.swift
eBlender Jan 16, 2025
1ffa4f0
Format and refactoring.
eBlender Jan 16, 2025
80f0991
Update FirebaseFunctions/Tests/Unit/FunctionsTests.swift
eBlender Jan 16, 2025
756dc26
Update FirebaseFunctions/Tests/Unit/FunctionsTests.swift
eBlender Jan 16, 2025
f031c1f
Update FirebaseFunctions/Tests/Unit/FunctionsTests.swift
eBlender Jan 16, 2025
0df7f8d
Update FirebaseFunctions/Tests/Unit/FunctionsTests.swift
eBlender Jan 16, 2025
3e325aa
[WIP] Add generic and basic streaming implementation
ncooke3 Jan 23, 2025
95fc340
Merge branch 'main' into iOS-Stremable-Functions
ncooke3 Jan 23, 2025
a712525
Merge branch 'main' into iOS-Stremable-Functions
ncooke3 Jan 23, 2025
7b42c35
Move unit tests
ncooke3 Jan 23, 2025
6dc3959
Merge remote-tracking branch 'origin/main' into iOS-Stremable-Functions
ncooke3 Jan 27, 2025
ca53153
Post main sync checkpoint
ncooke3 Jan 27, 2025
d3e476c
Copy over more structure from vertex
ncooke3 Jan 27, 2025
e33f74c
Updated to changes in 14376 so the impl is closer to Vertex on how th…
ncooke3 Jan 29, 2025
b98f71d
Update - remove tests
eBlender Jan 28, 2025
0ea118f
Add Documentation.
eBlender Jan 29, 2025
20892fd
Update Functions.swift
eBlender Jan 29, 2025
83eae53
Update IntegrationTests.swift
eBlender Jan 29, 2025
749b52d
Add more integration tests
ncooke3 Jan 29, 2025
480f7c3
Remove unneeded code
ncooke3 Jan 29, 2025
5587eb7
Fix comments
ncooke3 Jan 29, 2025
078210e
Rename internal stream API
ncooke3 Jan 29, 2025
caeff20
Streamable functions - Continuation of #14395 (#14465)
eBlender Feb 19, 2025
e68b47e
Style and fix TODOs
ncooke3 Feb 19, 2025
2aca67e
Decode StreamResponse
ncooke3 Feb 20, 2025
5fcaf4a
Update integration tests
ncooke3 Feb 21, 2025
e59368e
Intermediate fix for message decoding
ncooke3 Feb 21, 2025
1d75275
emulator backend add test
ncooke3 Feb 21, 2025
4b19247
Custom StreamResponse decoding and add more todos
ncooke3 Feb 21, 2025
b953f58
Don't decode result if stream response is not used
ncooke3 Feb 22, 2025
b8d32c9
Generalize decoding logic
ncooke3 Feb 25, 2025
659a19d
Only decode result in StreamResponse and throw FunctionErrors only
ncooke3 Feb 26, 2025
79b23c9
Resolve TODO & move availability attribute
ncooke3 Feb 26, 2025
5a7275e
Streamable functions api signature change (#14502)
eBlender Feb 26, 2025
1b8b700
Clean up tests following API sig. change
ncooke3 Feb 26, 2025
cb24822
Add changelog entry
ncooke3 Feb 26, 2025
fe17eb2
Remove unreleased API in emulator backend
ncooke3 Feb 26, 2025
10576c1
Remove unneeded availability attribute
ncooke3 Feb 26, 2025
b05d6f3
Resolve more testing TODOs
ncooke3 Feb 26, 2025
9de03a5
refactor private types
ncooke3 Feb 26, 2025
e2cf5f4
Add EmptyRequest type to represent parameter-less CF3
ncooke3 Feb 26, 2025
54967a8
Update doc comment
ncooke3 Feb 26, 2025
84b2950
Remove TODO that is no longer needed
ncooke3 Feb 26, 2025
60b4e6e
Remove unit test related TODO
ncooke3 Feb 26, 2025
672c5ed
Add doc header
ncooke3 Feb 26, 2025
a8313c0
Add doc comment for streamresponse
ncooke3 Feb 26, 2025
f9c557f
Apply suggestions from code review
ncooke3 Feb 27, 2025
2a55348
Review refactor
ncooke3 Feb 27, 2025
949bd63
Remove oos TODO
ncooke3 Feb 27, 2025
53c2560
Shorten test wait time to 100ms
ncooke3 Feb 27, 2025
2d59914
Docs
ncooke3 Feb 27, 2025
3723560
Add more info to doc comment
ncooke3 Feb 27, 2025
9be0b82
Wrap doc comment lines
ncooke3 Feb 27, 2025
3d709d8
Minor changes
ncooke3 Feb 27, 2025
bf21148
Remove unit tests
ncooke3 Feb 27, 2025
28bc4ed
Merge branch 'main' into streamable-functions
ncooke3 Feb 27, 2025
938bccf
Add test for Never
ncooke3 Feb 27, 2025
c9ce046
Add test for streaming non-streaming cf3
ncooke3 Feb 27, 2025
e19fa08
Add more tests
ncooke3 Feb 28, 2025
17f6898
Add top-level comment
ncooke3 Feb 28, 2025
3cf14bf
Add comment
ncooke3 Feb 28, 2025
fba5966
functions.yml
ncooke3 Feb 28, 2025
da798aa
[test] fix ci workaround
ncooke3 Feb 28, 2025
1a376f6
Fix availability check
ncooke3 Feb 28, 2025
ca67754
review (1)
ncooke3 Mar 4, 2025
65455ec
review (2)
ncooke3 Mar 4, 2025
f06ebf0
review (3) - line limit
ncooke3 Mar 4, 2025
9cf8c2c
review (4) - line limit (code)
ncooke3 Mar 4, 2025
887d139
Merge branch 'main' into streamable-functions
ncooke3 Mar 5, 2025
4b4bac6
Merge branch 'main' into streamable-functions
ncooke3 Mar 6, 2025
7e7e5f4
Post merge fix
ncooke3 Mar 6, 2025
f7da23e
Undo change
ncooke3 Mar 6, 2025
36821d0
rename test
ncooke3 Mar 6, 2025
c533101
small nit
ncooke3 Mar 6, 2025
ec33fa8
sendability
ncooke3 Mar 6, 2025
6da863b
Remote --test-spec
ncooke3 Mar 7, 2025
14058ba
Remove downloading iOS sim
ncooke3 Mar 7, 2025
4847a34
review
ncooke3 Mar 7, 2025
2f22696
review 2
ncooke3 Mar 7, 2025
66a7478
visionos ci fix
ncooke3 Mar 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions .github/workflows/functions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ jobs:
matrix:
target: [ios, tvos, macos, watchos]
build-env:
- os: macos-14
xcode: Xcode_15.2
- os: macos-15
xcode: Xcode_16.2
runs-on: ${{ matrix.build-env.os }}
Expand All @@ -43,14 +41,12 @@ jobs:
run: sudo xcode-select -s /Applications/${{ matrix.build-env.xcode }}.app/Contents/Developer
- name: Setup Bundler
run: scripts/setup_bundler.sh
# The integration tests are flaky on Xcode 15 so only run the unit tests. The integration tests still run with SPM.
# - name: Integration Test Server
# run: FirebaseFunctions/Backend/start.sh synchronous
- name: Integration Test Server
run: FirebaseFunctions/Backend/start.sh synchronous
- name: Build and test
run: |
scripts/third_party/travis/retry.sh scripts/pod_lib_lint.rb FirebaseFunctions.podspec \
--test-specs=unit --platforms=${{ matrix.target }}

--platforms=${{ matrix.target }}

spm-package-resolved:
runs-on: macos-14
Expand Down Expand Up @@ -145,6 +141,9 @@ jobs:
key: ${{needs.spm-package-resolved.outputs.cache_key}}
- name: Xcode
run: sudo xcode-select -s /Applications/${{ matrix.xcode }}.app/Contents/Developer
- name: Install visionOS, if needed.
if: matrix.target == 'visionOS'
run: xcodebuild -downloadPlatform visionOS
- name: Initialize xcodebuild
run: scripts/setup_spm_tests.sh
- name: Unit Tests
Expand Down
92 changes: 83 additions & 9 deletions FirebaseFunctions/Backend/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ const assert = require('assert');
const functionsV1 = require('firebase-functions/v1');
const functionsV2 = require('firebase-functions/v2');

// MARK: - Utilities

function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
};

// MARK: - Callable Functions

exports.dataTest = functionsV1.https.onRequest((request, response) => {
assert.deepEqual(request.body, {
data: {
Expand Down Expand Up @@ -121,22 +129,18 @@ exports.timeoutTest = functionsV1.https.onRequest((request, response) => {

const streamData = ["hello", "world", "this", "is", "cool"]

function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
};

async function* generateText() {
for (const chunk of streamData) {
yield chunk;
await sleep(1000);
await sleep(100);
}
};

exports.genStream = functionsV2.https.onCall(
async (request, response) => {
if (request.acceptsStreaming) {
for await (const chunk of generateText()) {
response.sendChunk({ chunk });
response.sendChunk(chunk);
}
}
return streamData.join(" ");
Expand All @@ -145,11 +149,81 @@ exports.genStream = functionsV2.https.onCall(

exports.genStreamError = functionsV2.https.onCall(
async (request, response) => {
// Note: The functions backend does not pass the error message to the
// client at this time.
throw Error("BOOM")
}
);

const weatherForecasts = {
Toronto: { conditions: 'snowy', temperature: 25 },
London: { conditions: 'rainy', temperature: 50 },
Dubai: { conditions: 'sunny', temperature: 75 }
};

async function* generateForecast(locations) {
for (const location of locations) {
yield { 'location': location, ...weatherForecasts[location.name] };
await sleep(100);
}
};

exports.genStreamWeather = functionsV2.https.onCall(
async (request, response) => {
const forecasts = [];
if (request.acceptsStreaming) {
for await (const chunk of generateText()) {
response.write({ chunk });
for await (const chunk of generateForecast(request.data)) {
forecasts.push(chunk)
response.sendChunk(chunk);
}
}
return { forecasts };
}
);

exports.genStreamWeatherError = functionsV2.https.onCall(
async (request, response) => {
if (request.acceptsStreaming) {
for await (const chunk of generateForecast(request.data)) {
// Remove the location field, since the SDK cannot decode the message
// if it's there.
delete chunk.location;
response.sendChunk(chunk);
}
}
return "Number of forecasts generated: " + request.data.length;
}
);

exports.genStreamEmpty = functionsV2.https.onCall(
async (request, response) => {
if (request.acceptsStreaming) {
// Send no chunks
}
// Implicitly return null.
}
);

exports.genStreamResultOnly = functionsV2.https.onCall(
async (request, response) => {
if (request.acceptsStreaming) {
// Do not send any chunks.
}
return "Only a result";
}
);

exports.genStreamLargeData = functionsV2.https.onCall(
async (request, response) => {
if (request.acceptsStreaming) {
const largeString = 'A'.repeat(10000);
const chunkSize = 1024;
for (let i = 0; i < largeString.length; i += chunkSize) {
const chunk = largeString.substring(i, i + chunkSize);
response.sendChunk(chunk);
await sleep(100);
}
throw Error("BOOM")
}
return "Stream Completed";
}
);
5 changes: 5 additions & 0 deletions FirebaseFunctions/Backend/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ FUNCTIONS_BIN="./node_modules/.bin/functions"
"${FUNCTIONS_BIN}" deploy timeoutTest --trigger-http
"${FUNCTIONS_BIN}" deploy genStream --trigger-http
"${FUNCTIONS_BIN}" deploy genStreamError --trigger-http
"${FUNCTIONS_BIN}" deploy genStreamWeather --trigger-http
"${FUNCTIONS_BIN}" deploy genStreamWeatherError --trigger-http
"${FUNCTIONS_BIN}" deploy genStreamEmpty --trigger-http
"${FUNCTIONS_BIN}" deploy genStreamResultOnly --trigger-http
"${FUNCTIONS_BIN}" deploy genStreamLargeData --trigger-http

if [ "$1" != "synchronous" ]; then
# Wait for the user to tell us to stop the server.
Expand Down
3 changes: 3 additions & 0 deletions FirebaseFunctions/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# Unreleased
- [added] Streaming callable functions are now supported.

# 11.9.0
- [fixed] Fixed App Check token reporting to enable differentiating outdated
(`MISSING`) and inauthentic (`INVALID`) clients; see [Monitor App Check
Expand Down
178 changes: 177 additions & 1 deletion FirebaseFunctions/Sources/Callable+Codable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
import FirebaseSharedSwift
import Foundation

/// A `Callable` is reference to a particular Callable HTTPS trigger in Cloud Functions.
/// A `Callable` is a reference to a particular Callable HTTPS trigger in Cloud Functions.
///
/// - Note: If the Callable HTTPS trigger accepts no parameters, ``Never`` can be used for
/// iOS 17.0+. Otherwise, a simple encodable placeholder type (e.g.,
/// `struct EmptyRequest: Encodable {}`) can be used.
public struct Callable<Request: Encodable, Response: Decodable> {
/// The timeout to use when calling the function. Defaults to 70 seconds.
public var timeoutInterval: TimeInterval {
Expand Down Expand Up @@ -160,3 +164,175 @@ public struct Callable<Request: Encodable, Response: Decodable> {
return try await call(data)
}
}

/// Used to determine when a `StreamResponse<_, _>` is being decoded.
private protocol StreamResponseProtocol {}

/// A convenience type used to receive both the streaming callable function's yielded messages and
/// its return value.
///
/// This can be used as the generic `Response` parameter to ``Callable`` to receive both the
/// yielded messages and final return value of the streaming callable function.
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
public enum StreamResponse<Message: Decodable, Result: Decodable>: Decodable,
StreamResponseProtocol {
/// The message yielded by the callable function.
case message(Message)
/// The final result returned by the callable function.
case result(Result)

private enum CodingKeys: String, CodingKey {
case message
case result
}

public init(from decoder: any Decoder) throws {
do {
let container = try decoder
.container(keyedBy: Self<Message, Result>.CodingKeys.self)
guard let onlyKey = container.allKeys.first, container.allKeys.count == 1 else {
throw DecodingError
.typeMismatch(
Self<Message,
Result>.self,
DecodingError.Context(
codingPath: container.codingPath,
debugDescription: "Invalid number of keys found, expected one.",
underlyingError: nil
)
)
}

switch onlyKey {
case .message:
self = try Self
.message(container.decode(Message.self, forKey: .message))
case .result:
self = try Self
.result(container.decode(Result.self, forKey: .result))
}
} catch {
throw FunctionsError(.dataLoss, userInfo: [NSUnderlyingErrorKey: error])
}
}
}

@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
public extension Callable where Request: Sendable, Response: Sendable {
/// Creates a stream that yields responses from the streaming callable function.
///
/// The request to the Cloud Functions backend made by this method automatically includes a FCM
/// token to identify the app instance. If a user is logged in with Firebase Auth, an auth ID
/// token for the user is included. If App Check is integrated, an app check token is included.
///
/// Firebase Cloud Messaging sends data to the Firebase backend periodically to collect
/// information regarding the app instance. To stop this, see `Messaging.deleteData()`. It
/// resumes with a new FCM Token the next time you call this method.
///
/// - Important: The final result returned by the callable function is only accessible when
/// using `StreamResponse` as the `Response` generic type.
///
/// Example of using `stream` _without_ `StreamResponse`:
/// ```swift
/// let callable: Callable<MyRequest, MyResponse> = // ...
/// let request: MyRequest = // ...
/// let stream = try callable.stream(request)
/// for try await response in stream {
/// // Process each `MyResponse` message
/// print(response)
/// }
/// ```
///
/// Example of using `stream` _with_ `StreamResponse`:
/// ```swift
/// let callable: Callable<MyRequest, StreamResponse<MyMessage, MyResult>> = // ...
/// let request: MyRequest = // ...
/// let stream = try callable.stream(request)
/// for try await response in stream {
/// switch response {
/// case .message(let message):
/// // Process each `MyMessage`
/// print(message)
/// case .result(let result):
/// // Process the final `MyResult`
/// print(result)
/// }
/// }
/// ```
///
/// - Parameter data: The `Request` data to pass to the callable function.
/// - Throws: A ``FunctionsError`` if the parameter `data` cannot be encoded.
/// - Returns: A stream wrapping responses yielded by the streaming callable function or
/// a ``FunctionsError`` if an error occurred.
func stream(_ data: Request? = nil) throws -> AsyncThrowingStream<Response, Error> {
let encoded: Any
do {
encoded = try encoder.encode(data)
} catch {
throw FunctionsError(.invalidArgument, userInfo: [NSUnderlyingErrorKey: error])
}

return AsyncThrowingStream { continuation in
Task {
do {
for try await response in callable.stream(encoded) {
do {
// This response JSON should only be able to be decoded to an `StreamResponse<_, _>`
// instance. If the decoding succeeds and the decoded response conforms to
// `StreamResponseProtocol`, we know the `Response` generic argument
// is `StreamResponse<_, _>`.
let responseJSON = switch response {
case .message(let json), .result(let json): json
}
let response = try decoder.decode(Response.self, from: responseJSON)
if response is StreamResponseProtocol {
continuation.yield(response)
} else {
// `Response` is a custom type that matched the decoding logic as the
// `StreamResponse<_, _>` type. Only the `StreamResponse<_, _>` type should decode
// successfully here to avoid exposing the `result` value in a custom type.
throw FunctionsError(.internal)
}
} catch let error as FunctionsError where error.code == .dataLoss {
// `Response` is of type `StreamResponse<_, _>`, but failed to decode. Rethrow.
throw error
} catch {
// `Response` is *not* of type `StreamResponse<_, _>`, and needs to be unboxed and
// decoded.
guard case let .message(messageJSON) = response else {
// Since `Response` is not a `StreamResponse<_, _>`, only messages should be
// decoded.
continue
}

do {
let boxedMessage = try decoder.decode(
StreamResponseMessage.self,
from: messageJSON
)
continuation.yield(boxedMessage.message)
} catch {
throw FunctionsError(.dataLoss, userInfo: [NSUnderlyingErrorKey: error])
}
}
}
} catch {
continuation.finish(throwing: error)
}
continuation.finish()
}
}
}

/// A container type for the type-safe decoding of the message object from the generic `Response`
/// type.
private struct StreamResponseMessage: Decodable {
let message: Response
}
}

/// A container type for differentiating between message and result responses.
enum JSONStreamResponse {
case message([String: Any])
case result([String: Any])
}
Loading
Loading