Skip to content

[core] Add support for streaming function to the local server #531

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jul 23, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
swift format
  • Loading branch information
sebsto committed Jul 12, 2025
commit c3f034063c3280d3374b3433852edb70f3d3cccd
39 changes: 25 additions & 14 deletions Sources/AWSLambdaRuntime/Lambda+LocalServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ internal struct LambdaHTTPServer {

case .end:
precondition(requestHead != nil, "Received .end without .head")

// process the buffered response for non streaming requests
if !self.isStreamingResponse(requestHead) {
// process the request and send the response
Expand All @@ -288,7 +288,7 @@ internal struct LambdaHTTPServer {
await self.responsePool.push(
LocalServerResponse(id: requestId, final: true)
)

}

requestHead = nil
Expand All @@ -311,16 +311,15 @@ internal struct LambdaHTTPServer {
/// This function checks if the request is a streaming response request
/// verb = POST, uri = :requestID/response, HTTP Header contains "Transfer-Encoding: chunked"
private func isStreamingResponse(_ requestHead: HTTPRequestHead) -> Bool {
requestHead.method == .POST &&
requestHead.uri.hasSuffix(Consts.postResponseURLSuffix) &&
requestHead.headers.contains(name: "Transfer-Encoding") &&
requestHead.headers["Transfer-Encoding"].contains("chunked")
requestHead.method == .POST && requestHead.uri.hasSuffix(Consts.postResponseURLSuffix)
&& requestHead.headers.contains(name: "Transfer-Encoding")
&& requestHead.headers["Transfer-Encoding"].contains("chunked")
}

/// This function pareses and returns the requestId or nil if the request is malformed
private func getRequestId(from head: HTTPRequestHead) -> String? {
let parts = head.uri.split(separator: "/")
return parts.count > 2 ? String(parts[parts.count - 2]) : nil
let parts = head.uri.split(separator: "/")
return parts.count > 2 ? String(parts[parts.count - 2]) : nil
}
/// This function process the URI request sent by the client and by the Lambda function
///
Expand Down Expand Up @@ -378,7 +377,7 @@ internal struct LambdaHTTPServer {
try await self.sendResponse(response, outbound: outbound, logger: logger)
if response.final == true {
logger.trace("/invoke returning")
return // if the response is final, we can return and close the connection
return // if the response is final, we can return and close the connection
}
} else {
logger.error(
Expand Down Expand Up @@ -497,7 +496,7 @@ internal struct LambdaHTTPServer {
if response.final {
logger.trace("Sending end")
try await outbound.write(HTTPServerResponsePart.end(nil))
}
}
}

/// A shared data structure to store the current invocation or response requests and the continuation objects.
Expand Down Expand Up @@ -585,7 +584,13 @@ internal struct LambdaHTTPServer {
let headers: HTTPHeaders?
let body: ByteBuffer?
let final: Bool
init(id: String? = nil, status: HTTPResponseStatus? = nil, headers: HTTPHeaders? = nil, body: ByteBuffer? = nil, final: Bool = false) {
init(
id: String? = nil,
status: HTTPResponseStatus? = nil,
headers: HTTPHeaders? = nil,
body: ByteBuffer? = nil,
final: Bool = false
) {
self.requestId = id
self.status = status
self.headers = headers
Expand All @@ -604,14 +609,20 @@ internal struct LambdaHTTPServer {
let headers = HTTPHeaders([
(AmazonHeaders.requestID, self.requestId),
(
AmazonHeaders.invokedFunctionARN,
"arn:aws:lambda:us-east-1:\(Int16.random(in: Int16.min ... Int16.max)):function:custom-runtime"
AmazonHeaders.invokedFunctionARN,
"arn:aws:lambda:us-east-1:\(Int16.random(in: Int16.min ... Int16.max)):function:custom-runtime"
),
(AmazonHeaders.traceID, "Root=\(AmazonHeaders.generateXRayTraceID());Sampled=1"),
(AmazonHeaders.deadline, "\(DispatchWallTime.distantFuture.millisSinceEpoch)"),
])

return LocalServerResponse(id: self.requestId, status: .accepted, headers: headers, body: self.request, final: true)
return LocalServerResponse(
id: self.requestId,
status: .accepted,
headers: headers,
body: self.request,
final: true
)
}
}
}
Expand Down