-
Notifications
You must be signed in to change notification settings - Fork 2
Compatibility w/ AWS Fargate #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
b57d8f4
49c35f6
43a7a09
5a79e1e
9de974f
23f8171
04389ef
b77ed50
09c6ee6
e12199d
1fb9aec
0bc0798
6463ccb
21939ab
eefe267
0850a09
d58347f
f51d540
9968f96
9bf5ab5
aad4d03
21fb963
b1f7acf
680dd42
8b28927
7617a6b
a1f0f8c
84fbc7a
d6d2060
1b6af0a
d907de5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
/pkg | ||
/build | ||
/bin | ||
# /bin | ||
*.swp | ||
*.iml | ||
tags | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -72,7 +72,7 @@ func printEndReports(invokeId string, initDuration string, memorySize string, in | |
invokeId, invokeDuration, math.Ceil(invokeDuration), memorySize, memorySize) | ||
} | ||
|
||
func InvokeHandler(w http.ResponseWriter, r *http.Request, sandbox Sandbox, bs interop.Bootstrap) { | ||
func InvokeHandler(w http.ResponseWriter, r *http.Request, sandbox Sandbox, bs interop.Bootstrap, doneCallback func(invokeResp *ResponseWriterProxy)) { | ||
log.Debugf("invoke: -> %s %s %v", r.Method, r.URL, r.Header) | ||
bodyBytes, err := ioutil.ReadAll(r.Body) | ||
if err != nil { | ||
|
@@ -81,6 +81,18 @@ func InvokeHandler(w http.ResponseWriter, r *http.Request, sandbox Sandbox, bs i | |
return | ||
} | ||
|
||
// Fix CORS error: https://github.com/aws/aws-lambda-runtime-interface-emulator/pull/84/files#diff-2c1a36d379bd5ed6e893749912ac8473cc46dddf5765024d46b44da2eb56348d | ||
if origin := r.Header.Get("Origin"); origin != "" { | ||
w.Header().Set("Access-Control-Allow-Origin", GetenvWithDefault("ACCESS_CONTROL_ALLOW_ORIGIN", origin)) | ||
w.Header().Set("Access-Control-Allow-Methods", GetenvWithDefault("ACCESS_CONTROL_ALLOW_METHODS", "POST, OPTIONS")) | ||
w.Header().Set("Access-Control-Allow-Headers", GetenvWithDefault("ACCESS_CONTROL_ALLOW_HEADERS", "*")) | ||
w.Header().Set("Access-Control-Allow-Credentials", GetenvWithDefault("ACCESS_CONTROL_ALLOW_CREDENTIALS", "true")) | ||
} | ||
if r.Method == "OPTIONS" { | ||
w.WriteHeader(200) | ||
return | ||
} | ||
|
||
initDuration := "" | ||
inv := GetenvWithDefault("AWS_LAMBDA_FUNCTION_TIMEOUT", "300") | ||
timeoutDuration, _ := time.ParseDuration(inv + "s") | ||
|
@@ -165,9 +177,11 @@ func InvokeHandler(w http.ResponseWriter, r *http.Request, sandbox Sandbox, bs i | |
case rapidcore.ErrInvokeTimeout: | ||
printEndReports(invokePayload.ID, initDuration, memorySize, invokeStart, timeoutDuration) | ||
|
||
w.Write([]byte(fmt.Sprintf("Task timed out after %d.00 seconds", timeout))) | ||
invokeResp.Write([]byte(fmt.Sprintf("Task timed out after %d.00 seconds", timeout))) | ||
w.Write(invokeResp.Body) | ||
time.Sleep(100 * time.Millisecond) | ||
//initDone = false | ||
doneCallback(invokeResp) // Call done after printEndReports | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At the end, call |
||
return | ||
} | ||
} | ||
|
@@ -178,6 +192,8 @@ func InvokeHandler(w http.ResponseWriter, r *http.Request, sandbox Sandbox, bs i | |
w.WriteHeader(invokeResp.StatusCode) | ||
} | ||
w.Write(invokeResp.Body) | ||
|
||
doneCallback(invokeResp) // Call done after printEndReports | ||
} | ||
|
||
func InitHandler(sandbox Sandbox, functionVersion string, timeout int64, bs interop.Bootstrap) (time.Time, time.Time) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,7 +4,12 @@ | |
package main | ||
|
||
import ( | ||
"os" | ||
"sync" | ||
"bytes" | ||
"strconv" | ||
"net/http" | ||
"encoding/json" | ||
|
||
log "github.com/sirupsen/logrus" | ||
"go.amzn.com/lambda/interop" | ||
|
@@ -16,15 +21,85 @@ func startHTTPServer(ipport string, sandbox *rapidcore.SandboxBuilder, bs intero | |
Addr: ipport, | ||
} | ||
|
||
log.Warnf("Listening on %s", ipport) | ||
|
||
maxInvocations := -1 // -1 means unlimited invocations | ||
// Get max invocations from environment variable | ||
maxInvocationsStr := os.Getenv("AWS_LAMBDA_SERVER_MAX_INVOCATIONS") | ||
if maxInvocationsStr != "" { | ||
if maxInvocationsInt, err := strconv.Atoi(maxInvocationsStr); err == nil { | ||
maxInvocations = maxInvocationsInt | ||
} else { | ||
log.Panicf("Invalid value for AWS_LAMBDA_SERVER_MAX_INVOCATIONS: %s", maxInvocationsStr) | ||
} | ||
} | ||
Comment on lines
+26
to
+35
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Read the |
||
|
||
// Channel to signal server shutdown | ||
shutdownChan := make(chan struct{}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Golang uses these channels to block code until the |
||
|
||
// Pass a channel | ||
http.HandleFunc("/2015-03-31/functions/function/invocations", func(w http.ResponseWriter, r *http.Request) { | ||
InvokeHandler(w, r, sandbox.LambdaInvokeAPI(), bs) | ||
InvokeHandler(w, r, sandbox.LambdaInvokeAPI(), bs, func(invokeResp *ResponseWriterProxy){ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function is called at the end of the response |
||
|
||
// Forward response if "forward-response" header exists | ||
if forwardURL := r.Header.Get("forward-response"); forwardURL != "" { | ||
// Create a wait group to wait for the API request to finish | ||
var wg sync.WaitGroup | ||
wg.Add(1) | ||
go func() { | ||
// Marshal the payload to JSON (you can use any other serialization format if needed) | ||
apiPayloadJSON, err := json.Marshal(string(invokeResp.Body)) | ||
if err != nil { | ||
log.Errorf("Failed to json marshal API payload: %s", err) | ||
return | ||
} | ||
|
||
// Create an API request to the URL in the "forward-response" header | ||
client := &http.Client{} | ||
req, err := http.NewRequest("POST", forwardURL, bytes.NewReader(apiPayloadJSON)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if |
||
// Add request headers | ||
req.Header.Add("Authorization", "Token "+os.Getenv("API_ACCESS_KEY")) | ||
req.Header.Add("Content-Type", "application/json") | ||
// Send the request | ||
resp, err := client.Do(req) | ||
if err != nil { | ||
log.Errorf("Failed to forward response: %s", err) | ||
return | ||
} | ||
defer resp.Body.Close() | ||
|
||
if resp.StatusCode == 200 { | ||
log.Printf("Forwarded response was successful") | ||
} else { | ||
log.Errorf("Forwarding response failed with status code: %d", resp.StatusCode) | ||
} | ||
|
||
defer wg.Done() // Defer the Done() call to mark the API request as completed | ||
}() | ||
|
||
wg.Wait() // Wait for the API request to finish before proceeding | ||
} | ||
|
||
// Shutdown the server if the maximum number of invocations is reached | ||
maxInvocations-- | ||
if maxInvocations == 0 { | ||
close(shutdownChan) | ||
} | ||
Comment on lines
+84
to
+87
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Decrement the maxInvocations, if ==0, then shut down the server |
||
}) | ||
}) | ||
|
||
// go routine (main thread waits) | ||
if err := srv.ListenAndServe(); err != nil { | ||
|
||
// go routine to handle server shutdown (main thread waits) | ||
go func() { | ||
<-shutdownChan | ||
log.Printf("Maximum invocations (%s) reached. Shutting down the server", maxInvocationsStr) | ||
if err := srv.Shutdown(nil); err != nil { | ||
log.Panic(err) | ||
} | ||
}() | ||
Comment on lines
+93
to
+99
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if |
||
|
||
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { | ||
log.Panic(err) | ||
} | ||
|
||
log.Warnf("Listening on %s", ipport) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixes CORS error, allows all origin