Skip to content

Compatibility w/ AWS Fargate #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 31 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
b57d8f4
Merge pull request #53 from aws/develop
valerena Sep 29, 2021
49c35f6
v1.3 release
valerena Nov 15, 2021
43a7a09
Merge changes for v1.8 release
valerena Sep 22, 2022
5a79e1e
Merge changes for v1.9 release
valerena Nov 1, 2022
9de974f
Merge changes for v1.11 release
valerena May 3, 2023
23f8171
Merge changes for v1.12 release
valerena Jun 8, 2023
04389ef
fix cors error
w3ichen Jul 26, 2023
b77ed50
upload bin/
w3ichen Jul 26, 2023
09c6ee6
add AWS_LAMBDA_SERVER_MAX_INVOCATIONS
w3ichen Jul 29, 2023
e12199d
grace period before server shutdown
w3ichen Jul 30, 2023
1fb9aec
wait for request to finish before shutdown
w3ichen Aug 1, 2023
0bc0798
fixing done channel
w3ichen Aug 1, 2023
6463ccb
change shutdown
w3ichen Aug 2, 2023
21939ab
Added forward-response header
w3ichen Aug 2, 2023
eefe267
ADD API_ACCESS_KEY
w3ichen Aug 10, 2023
0850a09
Create lambda-entrypoint.sh (copied from docker image)
w3ichen Aug 12, 2023
d58347f
Update lambda-entrypoint.sh with api_access_key.env file
w3ichen Aug 12, 2023
f51d540
Update lambda-entrypoint.sh
w3ichen Aug 13, 2023
9968f96
Update lambda-entrypoint.sh
w3ichen Aug 13, 2023
9bf5ab5
rm api_access_key
w3ichen Aug 13, 2023
aad4d03
Update lambda-entrypoint.sh
w3ichen Sep 30, 2023
21fb963
Create lambda-entrypoint.original.sh
w3ichen Sep 30, 2023
b1f7acf
Run lambda-entrypoint.sh as user
w3ichen Sep 30, 2023
680dd42
Update lambda-entrypoint.sh
w3ichen Sep 30, 2023
8b28927
remove lambda-entrypoint
w3ichen Sep 30, 2023
7617a6b
fix seg fault error binaries
w3ichen Dec 19, 2024
a1f0f8c
fix seg fault error binaries
w3ichen Dec 19, 2024
84fbc7a
Update README.md
w3ichen Dec 19, 2024
d6d2060
new binaries with patches
w3ichen Dec 19, 2024
1b6af0a
re-upload binaries compiled for linux
w3ichen Dec 19, 2024
d907de5
re-upload binaries compiled for linux
w3ichen Dec 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/pkg
/build
/bin
# /bin
*.swp
*.iml
tags
Expand Down
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
![GitHub go.mod Go version](https://img.shields.io/github/go-mod/go-version/aws/aws-lambda-runtime-interface-emulator)
![GitHub](https://img.shields.io/github/license/aws/aws-lambda-runtime-interface-emulator)

## Patches
- [Fix already reserved panic on concurrent invokes](https://github.com/aws/aws-lambda-runtime-interface-emulator/pull/133)

The Lambda Runtime Interface Emulator is a proxy for Lambda’s Runtime and Extensions APIs, which allows customers to
locally test their Lambda function packaged as a container image. It is a lightweight web-server that converts
Expand Down Expand Up @@ -171,6 +173,11 @@ The rest of these Environment Variables can be set to match AWS Lambda's environ
* `AWS_LAMBDA_FUNCTION_VERSION`
* `AWS_LAMBDA_FUNCTION_NAME`
* `AWS_LAMBDA_FUNCTION_MEMORY_SIZE`
* `AWS_LAMBDA_SERVER_MAX_INVOCATIONS`
* `API_ACCESS_KEY`

Request headers:
* `forward-response` to forward api response to server

## Level of support

Expand Down
Binary file added bin/aws-lambda-rie
Binary file not shown.
Binary file added bin/aws-lambda-rie-arm64
Binary file not shown.
Binary file added bin/aws-lambda-rie-x86_64
Binary file not shown.
20 changes: 18 additions & 2 deletions cmd/aws-lambda-rie/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func printEndReports(invokeId string, initDuration string, memorySize string, in
invokeId, invokeDuration, math.Ceil(invokeDuration), memorySize, memorySize)
}

func InvokeHandler(w http.ResponseWriter, r *http.Request, sandbox Sandbox, bs interop.Bootstrap) {
func InvokeHandler(w http.ResponseWriter, r *http.Request, sandbox Sandbox, bs interop.Bootstrap, doneCallback func(invokeResp *ResponseWriterProxy)) {
log.Debugf("invoke: -> %s %s %v", r.Method, r.URL, r.Header)
bodyBytes, err := ioutil.ReadAll(r.Body)
if err != nil {
Expand All @@ -81,6 +81,18 @@ func InvokeHandler(w http.ResponseWriter, r *http.Request, sandbox Sandbox, bs i
return
}

// Fix CORS error: https://github.com/aws/aws-lambda-runtime-interface-emulator/pull/84/files#diff-2c1a36d379bd5ed6e893749912ac8473cc46dddf5765024d46b44da2eb56348d
if origin := r.Header.Get("Origin"); origin != "" {
w.Header().Set("Access-Control-Allow-Origin", GetenvWithDefault("ACCESS_CONTROL_ALLOW_ORIGIN", origin))
w.Header().Set("Access-Control-Allow-Methods", GetenvWithDefault("ACCESS_CONTROL_ALLOW_METHODS", "POST, OPTIONS"))
w.Header().Set("Access-Control-Allow-Headers", GetenvWithDefault("ACCESS_CONTROL_ALLOW_HEADERS", "*"))
w.Header().Set("Access-Control-Allow-Credentials", GetenvWithDefault("ACCESS_CONTROL_ALLOW_CREDENTIALS", "true"))
}
if r.Method == "OPTIONS" {
w.WriteHeader(200)
return
}
Comment on lines +84 to +94
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixes CORS error, allows all origin


initDuration := ""
inv := GetenvWithDefault("AWS_LAMBDA_FUNCTION_TIMEOUT", "300")
timeoutDuration, _ := time.ParseDuration(inv + "s")
Expand Down Expand Up @@ -165,9 +177,11 @@ func InvokeHandler(w http.ResponseWriter, r *http.Request, sandbox Sandbox, bs i
case rapidcore.ErrInvokeTimeout:
printEndReports(invokePayload.ID, initDuration, memorySize, invokeStart, timeoutDuration)

w.Write([]byte(fmt.Sprintf("Task timed out after %d.00 seconds", timeout)))
invokeResp.Write([]byte(fmt.Sprintf("Task timed out after %d.00 seconds", timeout)))
w.Write(invokeResp.Body)
time.Sleep(100 * time.Millisecond)
//initDone = false
doneCallback(invokeResp) // Call done after printEndReports
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the end, call doneCallback

return
}
}
Expand All @@ -178,6 +192,8 @@ func InvokeHandler(w http.ResponseWriter, r *http.Request, sandbox Sandbox, bs i
w.WriteHeader(invokeResp.StatusCode)
}
w.Write(invokeResp.Body)

doneCallback(invokeResp) // Call done after printEndReports
}

func InitHandler(sandbox Sandbox, functionVersion string, timeout int64, bs interop.Bootstrap) (time.Time, time.Time) {
Expand Down
83 changes: 79 additions & 4 deletions cmd/aws-lambda-rie/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
package main

import (
"os"
"sync"
"bytes"
"strconv"
"net/http"
"encoding/json"

log "github.com/sirupsen/logrus"
"go.amzn.com/lambda/interop"
Expand All @@ -16,15 +21,85 @@ func startHTTPServer(ipport string, sandbox *rapidcore.SandboxBuilder, bs intero
Addr: ipport,
}

log.Warnf("Listening on %s", ipport)

maxInvocations := -1 // -1 means unlimited invocations
// Get max invocations from environment variable
maxInvocationsStr := os.Getenv("AWS_LAMBDA_SERVER_MAX_INVOCATIONS")
if maxInvocationsStr != "" {
if maxInvocationsInt, err := strconv.Atoi(maxInvocationsStr); err == nil {
maxInvocations = maxInvocationsInt
} else {
log.Panicf("Invalid value for AWS_LAMBDA_SERVER_MAX_INVOCATIONS: %s", maxInvocationsStr)
}
}
Comment on lines +26 to +35
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Read the AWS_LAMBDA_SERVER_MAX_INVOCATIONS environment variable into the maxInvocations variable.


// Channel to signal server shutdown
shutdownChan := make(chan struct{})
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Golang uses these channels to block code until the shutdownChan is triggered


// Pass a channel
http.HandleFunc("/2015-03-31/functions/function/invocations", func(w http.ResponseWriter, r *http.Request) {
InvokeHandler(w, r, sandbox.LambdaInvokeAPI(), bs)
InvokeHandler(w, r, sandbox.LambdaInvokeAPI(), bs, func(invokeResp *ResponseWriterProxy){
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is called at the end of the response


// Forward response if "forward-response" header exists
if forwardURL := r.Header.Get("forward-response"); forwardURL != "" {
// Create a wait group to wait for the API request to finish
var wg sync.WaitGroup
wg.Add(1)
go func() {
// Marshal the payload to JSON (you can use any other serialization format if needed)
apiPayloadJSON, err := json.Marshal(string(invokeResp.Body))
if err != nil {
log.Errorf("Failed to json marshal API payload: %s", err)
return
}

// Create an API request to the URL in the "forward-response" header
client := &http.Client{}
req, err := http.NewRequest("POST", forwardURL, bytes.NewReader(apiPayloadJSON))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if forward-response exists in the headers, send a POST request to the URL at the forward-response

// Add request headers
req.Header.Add("Authorization", "Token "+os.Getenv("API_ACCESS_KEY"))
req.Header.Add("Content-Type", "application/json")
// Send the request
resp, err := client.Do(req)
if err != nil {
log.Errorf("Failed to forward response: %s", err)
return
}
defer resp.Body.Close()

if resp.StatusCode == 200 {
log.Printf("Forwarded response was successful")
} else {
log.Errorf("Forwarding response failed with status code: %d", resp.StatusCode)
}

defer wg.Done() // Defer the Done() call to mark the API request as completed
}()

wg.Wait() // Wait for the API request to finish before proceeding
}

// Shutdown the server if the maximum number of invocations is reached
maxInvocations--
if maxInvocations == 0 {
close(shutdownChan)
}
Comment on lines +84 to +87
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decrement the maxInvocations, if ==0, then shut down the server

})
})

// go routine (main thread waits)
if err := srv.ListenAndServe(); err != nil {

// go routine to handle server shutdown (main thread waits)
go func() {
<-shutdownChan
log.Printf("Maximum invocations (%s) reached. Shutting down the server", maxInvocationsStr)
if err := srv.Shutdown(nil); err != nil {
log.Panic(err)
}
}()
Comment on lines +93 to +99
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if shutdownChan is closed, then srv.Shutdown(nil) is called to shut down the server


if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Panic(err)
}

log.Warnf("Listening on %s", ipport)
}
58 changes: 10 additions & 48 deletions lambda/rapidcore/server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

/**
* Required patches:
* - Fix already reserved panic on concurrent invokes
* https://github.com/aws/aws-lambda-runtime-interface-emulator/pull/133/files
*/

package rapidcore

import (
Expand Down Expand Up @@ -114,59 +120,35 @@ type Server struct {
var _ interop.Server = (*Server)(nil)

func (s *Server) setRapidPhase(phase rapidPhase) {
s.mutex.Lock()
defer s.mutex.Unlock()

s.rapidPhase = phase
}

func (s *Server) getRapidPhase() rapidPhase {
s.mutex.Lock()
defer s.mutex.Unlock()

return s.rapidPhase
}

func (s *Server) setRuntimeState(state runtimeState) {
s.mutex.Lock()
defer s.mutex.Unlock()

s.runtimeState = state
}

func (s *Server) getRuntimeState() runtimeState {
s.mutex.Lock()
defer s.mutex.Unlock()

return s.runtimeState
}

func (s *Server) SetInvokeTimeout(timeout time.Duration) {
s.mutex.Lock()
defer s.mutex.Unlock()

s.invokeTimeout = timeout
}

func (s *Server) GetInvokeTimeout() time.Duration {
s.mutex.Lock()
defer s.mutex.Unlock()

return s.invokeTimeout
}

func (s *Server) GetInvokeContext() *InvokeContext {
s.mutex.Lock()
defer s.mutex.Unlock()

ctx := *s.invokeCtx
return &ctx
}

func (s *Server) setNewInvokeContext(invokeID string, traceID, lambdaSegmentID string) (*ReserveResponse, error) {
s.mutex.Lock()
defer s.mutex.Unlock()

if s.invokeCtx != nil {
return nil, ErrAlreadyReserved
}
Expand Down Expand Up @@ -233,9 +215,6 @@ func (s *Server) awaitInitCompletion() {
}

func (s *Server) setReplyStream(w http.ResponseWriter, direct bool) (string, error) {
s.mutex.Lock()
defer s.mutex.Unlock()

if s.invokeCtx == nil {
return "", ErrNotReserved
}
Expand All @@ -255,9 +234,6 @@ func (s *Server) setReplyStream(w http.ResponseWriter, direct bool) (string, err

// Release closes the invocation, making server ready for reserve again
func (s *Server) Release() error {
s.mutex.Lock()
defer s.mutex.Unlock()

if s.invokeCtx == nil {
return ErrNotReserved
}
Expand All @@ -274,9 +250,6 @@ func (s *Server) Release() error {

// GetCurrentInvokeID
func (s *Server) GetCurrentInvokeID() string {
s.mutex.Lock()
defer s.mutex.Unlock()

if s.invokeCtx == nil {
return ""
}
Expand Down Expand Up @@ -357,8 +330,6 @@ func (s *Server) sendResponseUnsafe(invokeID string, additionalHeaders map[strin

func (s *Server) SendResponse(invokeID string, headers map[string]string, reader io.Reader, trailers http.Header, request *interop.CancellableRequest) error {
s.setRuntimeState(runtimeInvokeResponseSent)
s.mutex.Lock()
defer s.mutex.Unlock()
runtimeCalledResponse := true
return s.sendResponseUnsafe(invokeID, headers, http.StatusOK, reader, trailers, request, runtimeCalledResponse)
}
Expand All @@ -379,8 +350,6 @@ func (s *Server) SendInitErrorResponse(invokeID string, resp *interop.ErrorRespo
func (s *Server) SendErrorResponse(invokeID string, resp *interop.ErrorResponse) error {
log.Debugf("Sending Error Response: %s", resp.ErrorType)
s.setRuntimeState(runtimeInvokeError)
s.mutex.Lock()
defer s.mutex.Unlock()
additionalHeaders := map[string]string{contentTypeHeader: resp.ContentType, errorTypeHeader: resp.ErrorType}
if functionResponseMode := resp.FunctionResponseMode; functionResponseMode != "" {
additionalHeaders[functionResponseModeHeader] = functionResponseMode
Expand Down Expand Up @@ -485,14 +454,10 @@ func deadlineNsFromTimeoutMs(timeoutMs int64) int64 {
}

func (s *Server) setInitFailuresChan() {
s.mutex.Lock()
defer s.mutex.Unlock()
s.initFailures = make(chan interop.InitFailure)
}

func (s *Server) getInitFailuresChan() chan interop.InitFailure {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.initFailures
}

Expand Down Expand Up @@ -580,14 +545,10 @@ func (s *Server) FastInvoke(w http.ResponseWriter, i *interop.Invoke, direct boo
}

func (s *Server) setCachedInitErrorResponse(errResp *interop.ErrorResponse) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.cachedInitErrorResponse = errResp
}

func (s *Server) getCachedInitErrorResponse() *interop.ErrorResponse {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.cachedInitErrorResponse
}

Expand All @@ -600,8 +561,6 @@ func (s *Server) trySendDefaultErrorResponse(resp *interop.ErrorResponse) {
}

func (s *Server) CurrentToken() *interop.Token {
s.mutex.Lock()
defer s.mutex.Unlock()
if s.invokeCtx == nil {
return nil
}
Expand All @@ -612,6 +571,9 @@ func (s *Server) CurrentToken() *interop.Token {
// Invoke is used by the Runtime Interface Emulator (Rapid Local)
// https://github.com/aws/aws-lambda-runtime-interface-emulator
func (s *Server) Invoke(responseWriter http.ResponseWriter, invoke *interop.Invoke) error {
s.mutex.Lock()
defer s.mutex.Unlock()

resetCtx, resetCancel := context.WithCancel(context.Background())
defer resetCancel()

Expand Down Expand Up @@ -749,7 +711,7 @@ func (s *Server) awaitInitialized() (initCompletionResponse, error) {
// since it can be called twice when a caller wants to wait until init is complete
func (s *Server) AwaitInitialized() error {
if _, err := s.awaitInitialized(); err != nil {
if releaseErr := s.Release(); err != nil {
if releaseErr := s.Release(); releaseErr != nil {
log.Infof("Error releasing after init failure %s: %s", err, releaseErr)
}
s.setRuntimeState(runtimeInitFailed)
Expand Down