Skip to content

Commit

Permalink
Add file watching (lambci#231)
Browse files Browse the repository at this point in the history
* Add SIGHUP handler to kill/restart bootstrap in provided runtime

Fixes lambci#229

* Add file watching to provided runtime

Will kill/restart bootstrap if DOCKER_LAMBDA_WATCH env is set and files in /var/task or /opt change

* If provided runtime is running bootstrap, just exit instead

* Exit java8 runtime with error code 2 on SIGHUP or ConnectException

* Exit nodejs4.3/6.10/8.10 runtimes with error code 2 on SIGHUP or ECONNRESET

* Squash with java

* Exit python2.7/3.6 runtimes with error code 2 on SIGHUP or connection error

* Exit dotnetcore2.0/2.1 runtimes with error code 2 on SocketException
  • Loading branch information
mhart authored Dec 1, 2019
1 parent 90eb13b commit 2f584c4
Show file tree
Hide file tree
Showing 11 changed files with 168 additions and 33 deletions.
25 changes: 22 additions & 3 deletions dotnetcore2.0/run/MockBootstraps/MockRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,26 @@ unsafe InvokeData ILambdaRuntime.ReceiveInvoke(IDictionary initialEnvironmentVar
{
logs = "";
}
var result = client.GetAsync("http://127.0.0.1:9001/2018-06-01/runtime/invocation/next").Result;
HttpResponseMessage result = null;
try
{
result = client.GetAsync("http://127.0.0.1:9001/2018-06-01/runtime/invocation/next").Result;
}
catch (AggregateException ae)
{
if (ae.InnerException is HttpRequestException && ae.InnerException.InnerException != null &&
(ae.InnerException.InnerException is SocketException ||
// happens on dotnetcore2.0
ae.InnerException.InnerException.GetType().ToString().Equals("System.Net.Http.CurlException")))
{
System.Environment.Exit(context.StayOpen ? 2 : (invokeError == null ? 0 : 1));
}
else
{
throw ae;
}
}

if (result.StatusCode != HttpStatusCode.OK)
{
throw new Exception("Got a bad response from the bootstrap");
Expand Down Expand Up @@ -185,12 +204,12 @@ public unsafe void ReportDone(string invokeId, string errorType, bool waitForExi
}
catch (AggregateException ae)
{
if (!context.StayOpen && ae.InnerException is HttpRequestException && ae.InnerException.InnerException != null &&
if (ae.InnerException is HttpRequestException && ae.InnerException.InnerException != null &&
(ae.InnerException.InnerException is SocketException ||
// happens on dotnetcore2.0
ae.InnerException.InnerException.GetType().ToString().Equals("System.Net.Http.CurlException")))
{
System.Environment.Exit(string.IsNullOrEmpty(errorType) && invokeError == null ? 0 : 1);
System.Environment.Exit(context.StayOpen ? 2 : (string.IsNullOrEmpty(errorType) && invokeError == null ? 0 : 1));
}
else
{
Expand Down
25 changes: 22 additions & 3 deletions dotnetcore2.1/run/MockBootstraps/MockRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,26 @@ unsafe InvokeData ILambdaRuntime.ReceiveInvoke(IDictionary initialEnvironmentVar
{
logs = "";
}
var result = client.GetAsync("http://127.0.0.1:9001/2018-06-01/runtime/invocation/next").Result;
HttpResponseMessage result = null;
try
{
result = client.GetAsync("http://127.0.0.1:9001/2018-06-01/runtime/invocation/next").Result;
}
catch (AggregateException ae)
{
if (ae.InnerException is HttpRequestException && ae.InnerException.InnerException != null &&
(ae.InnerException.InnerException is SocketException ||
// happens on dotnetcore2.0
ae.InnerException.InnerException.GetType().ToString().Equals("System.Net.Http.CurlException")))
{
System.Environment.Exit(context.StayOpen ? 2 : (invokeError == null ? 0 : 1));
}
else
{
throw ae;
}
}

if (result.StatusCode != HttpStatusCode.OK)
{
throw new Exception("Got a bad response from the bootstrap");
Expand Down Expand Up @@ -186,12 +205,12 @@ public unsafe void ReportDone(string invokeId, string errorType, bool waitForExi
}
catch (AggregateException ae)
{
if (!context.StayOpen && ae.InnerException is HttpRequestException && ae.InnerException.InnerException != null &&
if (ae.InnerException is HttpRequestException && ae.InnerException.InnerException != null &&
(ae.InnerException.InnerException is SocketException ||
// happens on dotnetcore2.0
ae.InnerException.InnerException.GetType().ToString().Equals("System.Net.Http.CurlException")))
{
System.Environment.Exit(string.IsNullOrEmpty(errorType) && invokeError == null ? 0 : 1);
System.Environment.Exit(context.StayOpen ? 2 : (string.IsNullOrEmpty(errorType) && invokeError == null ? 0 : 1));
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.gson.Gson;

import sun.misc.Unsafe;
import sun.misc.Signal;

@SuppressWarnings("restriction")
public class LambdaRuntime {
Expand Down Expand Up @@ -99,6 +100,13 @@ public class LambdaRuntime {
} catch (Exception e) {
throw new RuntimeException(e);
}

Signal.handle(new Signal("HUP"), (Signal signal) -> {
if (STAY_OPEN) {
systemErr("SIGHUP received, exiting runtime...");
System.exit(2);
}
});
}

public static void initRuntime() {
Expand Down Expand Up @@ -144,7 +152,7 @@ public static InvokeRequest waitForInvoke() {
throw new RuntimeException("Unexpected status code from invocation/next: " + responseCode);
}
} catch (ConnectException e) {
System.exit(errored ? 1 : 0);
System.exit(STAY_OPEN ? 2 : (errored ? 1 : 0));
}
String requestId = conn.getHeaderField("Lambda-Runtime-Aws-Request-Id");
deadlineMs = Long.parseLong(conn.getHeaderField("Lambda-Runtime-Deadline-Ms"));
Expand Down
6 changes: 5 additions & 1 deletion nodejs4.3/run/awslambda-mock.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ var DEADLINE_MS = Date.now() + (TIMEOUT * 1000)

process.on('SIGINT', () => process.exit(0))
process.on('SIGTERM', () => process.exit(0))
process.on('SIGHUP', () => {
systemErr("SIGHUP received, exiting runtime...")
process.exit(2)
})

// Don't think this can be done in the Docker image
process.umask(2)
Expand Down Expand Up @@ -134,7 +138,7 @@ module.exports = {
})
}).on('error', err => {
if (err.code === 'ECONNRESET') {
return process.exit(errored ? 1 : 0)
return process.exit(STAY_OPEN ? 2 : (errored ? 1 : 0))
}
console.error(err)
process.exit(1)
Expand Down
6 changes: 5 additions & 1 deletion nodejs6.10/run/awslambda-mock.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ var DEADLINE_MS = Date.now() + (TIMEOUT * 1000)

process.on('SIGINT', () => process.exit(0))
process.on('SIGTERM', () => process.exit(0))
process.on('SIGHUP', () => {
systemErr("SIGHUP received, exiting runtime...")
process.exit(2)
})

// Don't think this can be done in the Docker image
process.umask(2)
Expand Down Expand Up @@ -134,7 +138,7 @@ module.exports = {
})
}).on('error', err => {
if (err.code === 'ECONNRESET') {
return process.exit(errored ? 1 : 0)
return process.exit(STAY_OPEN ? 2 : (errored ? 1 : 0))
}
console.error(err)
process.exit(1)
Expand Down
6 changes: 5 additions & 1 deletion nodejs8.10/run/awslambda-mock.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ var DEADLINE_MS = Date.now() + (TIMEOUT * 1000)

process.on('SIGINT', () => process.exit(0))
process.on('SIGTERM', () => process.exit(0))
process.on('SIGHUP', () => {
systemErr("SIGHUP received, exiting runtime...")
process.exit(2)
})

// Don't think this can be done in the Docker image
process.umask(2)
Expand Down Expand Up @@ -135,7 +139,7 @@ module.exports = {
})
}).on('error', err => {
if (err.code === 'ECONNRESET') {
return process.exit(errored ? 1 : 0)
return process.exit(STAY_OPEN ? 2 : (errored ? 1 : 0))
}
console.error(err)
process.exit(1)
Expand Down
1 change: 1 addition & 0 deletions provided/run/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module init
require (
github.com/go-chi/chi v4.0.2+incompatible
github.com/go-chi/render v1.0.1
github.com/rjeczalik/notify v0.9.2
)

go 1.13
4 changes: 4 additions & 0 deletions provided/run/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,7 @@ github.com/go-chi/chi v4.0.2+incompatible h1:maB6vn6FqCxrpz4FqWdh4+lwpyZIQS7YEAU
github.com/go-chi/chi v4.0.2+incompatible/go.mod h1:eB3wogJHnLi3x/kFX2A+IbTBlXxmMeXJVKy9tTv1XzQ=
github.com/go-chi/render v1.0.1 h1:4/5tis2cKaNdnv9zFLfXzcquC9HbeZgCnxGnKrltBS8=
github.com/go-chi/render v1.0.1/go.mod h1:pq4Rr7HbnsdaeHagklXub+p6Wd16Af5l9koip1OvJns=
github.com/rjeczalik/notify v0.9.2 h1:MiTWrPj55mNDHEiIX5YUSKefw/+lCQVoAFmD6oQm5w8=
github.com/rjeczalik/notify v0.9.2/go.mod h1:aErll2f0sUX9PXZnVNyeiObbmTlk5jnMoCa4QEjJeqM=
golang.org/x/sys v0.0.0-20180926160741-c2ed4eda69e7 h1:bit1t3mgdR35yN0cX0G8orgLtOuyL9Wqxa1mccLB0ig=
golang.org/x/sys v0.0.0-20180926160741-c2ed4eda69e7/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
82 changes: 73 additions & 9 deletions provided/run/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/go-chi/chi"
"github.com/go-chi/render"
"github.com/rjeczalik/notify"
)

var logDebug = os.Getenv("DOCKER_LAMBDA_DEBUG") != ""
Expand All @@ -36,6 +37,7 @@ var noBootstrap = os.Getenv("DOCKER_LAMBDA_NO_BOOTSTRAP") != ""
var apiPort = getEnv("DOCKER_LAMBDA_API_PORT", "9001")
var useStdin = os.Getenv("DOCKER_LAMBDA_USE_STDIN") != ""
var noModifyLogs = os.Getenv("DOCKER_LAMBDA_NO_MODIFY_LOGS") != ""
var watchMode = os.Getenv("DOCKER_LAMBDA_WATCH") != ""

var curState = "STATE_INIT"

Expand All @@ -52,7 +54,7 @@ var curContext *mockLambdaContext
var bootstrapCmd *exec.Cmd
var initPrinted bool
var eventChan chan *mockLambdaContext
var exited bool
var bootstrapExitedGracefully bool
var bootstrapIsRunning bool
var bootstrapPath *string
var bootstrapArgs []string
Expand Down Expand Up @@ -171,6 +173,10 @@ func main() {
serverInitEnd = time.Now()

if stayOpen {
if watchMode {
setupFileWatchers()
}
setupSighupHandler()
systemLog(fmt.Sprintf("Lambda API listening on port %s...", apiPort))
<-interrupt
} else {
Expand Down Expand Up @@ -202,6 +208,36 @@ func main() {
exit(exitCode)
}

func setupSighupHandler() {
sighupReceiver := make(chan os.Signal, 1)
signal.Notify(sighupReceiver, syscall.SIGHUP)
go func() {
for {
<-sighupReceiver
systemLog(fmt.Sprintf("SIGHUP received, restarting bootstrap..."))
reboot()
}
}()
}

func setupFileWatchers() {
fileWatcher := make(chan notify.EventInfo, 1)
if err := notify.Watch("/var/task/...", fileWatcher, notify.All); err != nil {
log.Fatal(err)
}
if err := notify.Watch("/opt/...", fileWatcher, notify.All); err != nil {
log.Fatal(err)
}
go func() {
for {
ei := <-fileWatcher
debug("Received notify event: ", ei)
systemLog(fmt.Sprintf("Handler/layer file changed, restarting bootstrap..."))
reboot()
}
}()
}

func formatOneLineJSON(body []byte) string {
payloadObj := &json.RawMessage{}
if json.Unmarshal(body, payloadObj) == nil {
Expand Down Expand Up @@ -263,6 +299,7 @@ func ensureBootstrapIsRunning(context *mockLambdaContext) error {
}

bootstrapIsRunning = true
bootstrapExitedGracefully = false

// Get an initial read of memory, and update when we finish
context.MaxMem, _ = allProcsMemoryInMb()
Expand All @@ -271,7 +308,7 @@ func ensureBootstrapIsRunning(context *mockLambdaContext) error {
bootstrapCmd.Wait()
bootstrapIsRunning = false
curState = "STATE_INIT"
if !exited {
if !bootstrapExitedGracefully {
// context may have changed, use curContext instead
curContext.SetError(fmt.Errorf("Runtime exited without providing a reason"))
}
Expand All @@ -281,11 +318,23 @@ func ensureBootstrapIsRunning(context *mockLambdaContext) error {
}

func exit(exitCode int) {
exited = true
killBootstrap()
os.Exit(exitCode)
}

func reboot() {
if noBootstrap {
os.Exit(2)
} else {
killBootstrap()
}
}

func killBootstrap() {
bootstrapExitedGracefully = true
if bootstrapCmd != nil && bootstrapCmd.Process != nil {
syscall.Kill(-bootstrapCmd.Process.Pid, syscall.SIGKILL)
}
os.Exit(exitCode)
}

func waitForContext(context *mockLambdaContext) {
Expand Down Expand Up @@ -402,7 +451,7 @@ func createRuntimeRouter() *chi.Mux {
r.
With(updateState("STATE_INIT_ERROR")).
Post("/init/error", func(w http.ResponseWriter, r *http.Request) {
debug("In /init/error...")
debug("In /init/error")
curContext = <-eventChan
handleErrorRequest(w, r)
curContext.EndInvoke(nil)
Expand All @@ -411,14 +460,28 @@ func createRuntimeRouter() *chi.Mux {
r.
With(updateState("STATE_INVOKE_NEXT")).
Get("/invocation/next", func(w http.ResponseWriter, r *http.Request) {
debug("In /invocation/next...")
debug("In /invocation/next")

if curContext.Reply != nil {
debug("Reply is not nil...")
debug("Reply is not nil")
curContext.EndInvoke(nil)
}

closeNotify := w.(http.CloseNotifier).CloseNotify()
go func() {
<-closeNotify
debug("Connection closed, sending ignore event")
eventChan <- &mockLambdaContext{Ignore: true}
}()

debug("Waiting for next event...")
curContext = <-eventChan
context := curContext
context := <-eventChan
if context.Ignore {
debug("Ignore event received, returning")
w.Write([]byte{})
return
}
curContext = context
context.LogStartRequest()

w.Header().Set("Content-Type", "application/json")
Expand Down Expand Up @@ -732,6 +795,7 @@ type mockLambdaContext struct {
LogTail string // base64 encoded tail, no greater than 4096 bytes
ErrorType string // Unhandled vs Handled
Ended bool
Ignore bool
}

func (mc *mockLambdaContext) ParseTimeout() {
Expand Down
18 changes: 11 additions & 7 deletions python2.7/run/runtime_mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
from httplib import HTTPConnection


signal.signal(signal.SIGINT, lambda x, y: sys.exit(0))
signal.signal(signal.SIGTERM, lambda x, y: sys.exit(0))

ORIG_STDOUT = sys.stdout
ORIG_STDERR = sys.stderr

Expand Down Expand Up @@ -100,6 +97,15 @@
MOCKSERVER_CONN = HTTPConnection("127.0.0.1", 9001)


def sighup_handler(signum, frame):
eprint("SIGHUP received, exiting runtime...")
sys.exit(2)

signal.signal(signal.SIGINT, lambda x, y: sys.exit(0))
signal.signal(signal.SIGTERM, lambda x, y: sys.exit(0))
signal.signal(signal.SIGHUP, sighup_handler)


def eprint(*args, **kwargs):
print(*args, file=ORIG_STDERR, **kwargs)

Expand Down Expand Up @@ -181,10 +187,8 @@ def receive_invoke():
if resp.status != 200:
raise Exception("/invocation/next return status %d" % resp.status)
except Exception:
if INVOKED and not STAY_OPEN:
sys.exit(1 if ERRORED else 0)
return ()
raise
sys.exit(2 if STAY_OPEN else (1 if ERRORED else 0))
return ()

INVOKEID = resp.getheader('Lambda-Runtime-Aws-Request-Id')
DEADLINE_MS = int(resp.getheader('Lambda-Runtime-Deadline-Ms'))
Expand Down
Loading

0 comments on commit 2f584c4

Please sign in to comment.