Skip to content

Commit f425e86

Browse files
committed
simplify away http proxy; fix forever-deferred body close bug
1 parent 8a25294 commit f425e86

File tree

8 files changed

+71
-101
lines changed

8 files changed

+71
-101
lines changed

src/lambda/lambdaInstance.go

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,7 @@ import (
44
"io"
55
"log"
66
"net/http"
7-
"net/http/httputil"
87
"strings"
9-
"time"
108

119
"github.com/open-lambda/open-lambda/ol/common"
1210
"github.com/open-lambda/open-lambda/ol/sandbox"
@@ -40,8 +38,6 @@ func (linst *LambdaInstance) Task() {
4038
f := linst.lfunc
4139

4240
var sb sandbox.Sandbox = nil
43-
//var client *http.Client = nil // whenever we create a Sandbox, we init this too
44-
var proxy *httputil.ReverseProxy = nil // whenever we create a Sandbox, we init this too
4541
var err error
4642

4743
for {
@@ -129,27 +125,11 @@ func (linst *LambdaInstance) Task() {
129125
f.doneChan <- req
130126
continue // wait for another request before retrying
131127
}
132-
133-
log.Printf("Connecting to sandbox")
134-
135-
proxy, err = sb.HTTPProxy()
136-
if err != nil {
137-
linst.TrySendError(req, http.StatusBadGateway, "could not connect to Sandbox: "+err.Error()+"\n", sb)
138-
f.doneChan <- req
139-
f.printf("discard sandbox %s due to Channel error: %v", sb.ID(), err)
140-
sb = nil
141-
continue // wait for another request before retrying
142-
}
143128
}
144129
t.T1()
145130

146131
// below here, we're guaranteed (1) sb != nil, (2) proxy != nil, (3) sb is unpaused
147132

148-
client := http.Client{
149-
Transport: proxy.Transport,
150-
Timeout: time.Second * time.Duration(common.Conf.Limits.Max_runtime_default),
151-
}
152-
153133
// serve until we incoming queue is empty
154134
t = common.T0("LambdaInstance-ServeRequests")
155135
for req != nil {
@@ -163,7 +143,7 @@ func (linst *LambdaInstance) Task() {
163143
if err != nil {
164144
linst.TrySendError(req, http.StatusInternalServerError, "Could not create NewRequest: "+err.Error(), sb)
165145
} else {
166-
resp, err := client.Do(httpReq)
146+
resp, err := sb.Client().Do(httpReq)
167147

168148
// copy response out
169149
if err != nil {
@@ -179,13 +159,14 @@ func (linst *LambdaInstance) Task() {
179159
req.w.WriteHeader(resp.StatusCode)
180160

181161
// copy body
182-
defer resp.Body.Close()
183162
if _, err := io.Copy(req.w, resp.Body); err != nil {
184163
// already used WriteHeader, so can't use that to surface on error anymore
185164
msg := "reading lambda response failed: "+err.Error()+"\n"
186165
f.printf("error: "+msg)
187166
linst.TrySendError(req, 0, msg, sb)
188167
}
168+
169+
resp.Body.Close()
189170
}
190171
}
191172

src/lambda/packagePuller.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -275,11 +275,6 @@ func (pp *PackagePuller) sandboxInstall(p *Package) (err error) {
275275
}
276276
defer sb.Destroy("package installation complete")
277277

278-
proxy, err := sb.HTTPProxy()
279-
if err != nil {
280-
return err
281-
}
282-
283278
// we still need to run a Sandbox to parse the dependencies, even if it is already installed
284279
msg := fmt.Sprintf(`{"pkg": "%s", "alreadyInstalled": %v}`, p.name, alreadyInstalled)
285280
reqBody := bytes.NewReader([]byte(msg))
@@ -289,7 +284,7 @@ func (pp *PackagePuller) sandboxInstall(p *Package) (err error) {
289284
if err != nil {
290285
return err
291286
}
292-
resp, err := proxy.Transport.RoundTrip(req)
287+
resp, err := sb.Client().Do(req)
293288
if err != nil {
294289
return err
295290
}

src/sandbox/api.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package sandbox
22

33
import (
44
"github.com/open-lambda/open-lambda/ol/common"
5-
"net/http/httputil"
5+
"net/http"
66
)
77

88
type SandboxPool interface {
@@ -60,7 +60,7 @@ type Sandbox interface {
6060
Unpause() error
6161

6262
// Communication channel to forward requests.
63-
HTTPProxy() (*httputil.ReverseProxy, error)
63+
Client() (*http.Client)
6464

6565
// Lookup metadata that Sandbox was initialized with (static over time)
6666
Meta() *SandboxMeta
@@ -81,7 +81,7 @@ type Sandbox interface {
8181
// Child calls this on parent to notify of child Destroy
8282
childExit(child Sandbox)
8383

84-
GetRuntimeType() common.RuntimeType
84+
GetRuntimeType() common.RuntimeType // TODO: make it part of SandboxMeta?
8585
}
8686

8787
type SandboxMeta struct {

src/sandbox/docker.go

Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,8 @@ import (
1515
"fmt"
1616
"io/ioutil"
1717
"log"
18-
"net"
19-
"net/http"
20-
"net/http/httputil"
21-
"net/url"
2218
"os"
19+
"net/http"
2320
"path/filepath"
2421
"time"
2522

@@ -37,6 +34,7 @@ type DockerContainer struct {
3734
installed map[string]bool
3835
meta *SandboxMeta
3936
rtType common.RuntimeType
37+
httpClient *http.Client
4038
}
4139

4240
type HandlerState int
@@ -111,26 +109,8 @@ func (c *DockerContainer) State() (hstate HandlerState, err error) {
111109
return hstate, nil
112110
}
113111

114-
// HTTPProxy returns a file socket channel for direct communication with the sandbox.
115-
func (c *DockerContainer) HTTPProxy() (p *httputil.ReverseProxy, err error) {
116-
sockPath := filepath.Join(c.hostDir, "ol.sock")
117-
if len(sockPath) > 108 {
118-
return nil, fmt.Errorf("socket path length cannot exceed 108 characters (try moving cluster closer to the root directory")
119-
}
120-
121-
dial := func(proto, addr string) (net.Conn, error) {
122-
return net.Dial("unix", sockPath)
123-
}
124-
125-
tr := &http.Transport{Dial: dial}
126-
u, err := url.Parse("http://sock-container")
127-
if err != nil {
128-
panic(err)
129-
}
130-
131-
proxy := httputil.NewSingleHostReverseProxy(u)
132-
proxy.Transport = tr
133-
return proxy, nil
112+
func (c *DockerContainer) Client() (*http.Client) {
113+
return c.httpClient
134114
}
135115

136116
// Start starts the container.
@@ -165,6 +145,9 @@ func (c *DockerContainer) Pause() error {
165145
return c.dockerError(err)
166146
}
167147

148+
// idle connections use a LOT of memory in the OL process
149+
c.httpClient.CloseIdleConnections()
150+
168151
return nil
169152
}
170153

src/sandbox/dockerPool.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ import (
66
"strings"
77
"sync/atomic"
88
"syscall"
9+
"net"
10+
"net/http"
11+
"time"
912

1013
docker "github.com/fsouza/go-dockerclient"
1114

@@ -135,6 +138,21 @@ func (pool *DockerPool) Create(parent Sandbox, isLeaf bool, codeDir, scratchDir
135138
return nil, err
136139
}
137140

141+
// start HTTP client
142+
sockPath := filepath.Join(c.hostDir, "ol.sock")
143+
if len(sockPath) > 108 {
144+
return nil, fmt.Errorf("socket path length cannot exceed 108 characters (try moving cluster closer to the root directory")
145+
}
146+
147+
dial := func(proto, addr string) (net.Conn, error) {
148+
return net.Dial("unix", sockPath)
149+
}
150+
151+
c.httpClient = &http.Client{
152+
Transport: &http.Transport{Dial: dial},
153+
Timeout: time.Second * time.Duration(common.Conf.Limits.Max_runtime_default),
154+
}
155+
138156
// wrap to make thread-safe and handle container death
139157
safe := newSafeSandbox(c)
140158
safe.startNotifyingListeners(pool.eventHandlers)

src/sandbox/safeSandbox.go

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ package sandbox
1010
import (
1111
"fmt"
1212
"log"
13-
"net/http/httputil"
13+
"net/http"
1414
"strings"
1515
"sync"
1616

@@ -167,22 +167,16 @@ func (sb *safeSandbox) Unpause() (err error) {
167167
return nil
168168
}
169169

170-
func (sb *safeSandbox) HttpProxy() (p *httputil.ReverseProxy, err error) {
171-
sb.printf("Channel()")
172-
t := common.T0("Channel()")
173-
defer t.T1()
174-
sb.Mutex.Lock()
175-
defer sb.Mutex.Unlock()
176-
177-
if sb.dead != nil {
178-
return nil, sb.dead
179-
}
180-
181-
p, err = sb.Sandbox.HTTPProxy()
182-
if err != nil {
183-
sb.destroyOnErr("HttpProxy", err) // TODO: rename if/when HttpProxy is replaced
184-
}
185-
return p, err
170+
func (sb *safeSandbox) Client() (*http.Client) {
171+
// According to the docs, "Clients and Transports are safe for
172+
// concurrent use by multiple goroutines and for efficiency
173+
// should only be created once and re-used."
174+
//
175+
// So we don't use any locking around this one. This also
176+
// can't fail because the client should have been created
177+
// during sandbox initialization (and if there were any error,
178+
// it should have occured then).
179+
return sb.Sandbox.Client()
186180
}
187181

188182
// fork (as a private method) doesn't cleanup parent sb if fork fails

src/sandbox/sock.go

Lines changed: 8 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,7 @@ import (
55
"io/ioutil"
66
"log"
77
"sync/atomic"
8-
"net"
98
"net/http"
10-
"net/http/httputil"
11-
"net/url"
129
"os"
1310
"os/exec"
1411
"path/filepath"
@@ -29,6 +26,7 @@ type SOCKContainer struct {
2926
scratchDir string
3027
cg *Cgroup
3128
rtType common.RuntimeType
29+
client *http.Client
3230

3331
// 1 for self, plus 1 for each child (we can't release memory
3432
// until all descendants are dead, because they share the
@@ -58,32 +56,6 @@ func (c *SOCKContainer) GetRuntimeType() common.RuntimeType {
5856
return c.rtType
5957
}
6058

61-
func (container *SOCKContainer) HTTPProxy() (p *httputil.ReverseProxy, err error) {
62-
// note, for debugging, you can directly contact the sock file like this:
63-
// curl -XPOST --unix-socket ./ol.sock http:/test -d '{"some": "data"}'
64-
65-
sockPath := filepath.Join(container.scratchDir, "ol.sock")
66-
if len(sockPath) > 108 {
67-
return nil, fmt.Errorf("socket path length cannot exceed 108 characters (try moving cluster closer to the root directory")
68-
}
69-
70-
log.Printf("Connecting to container at '%s'", sockPath)
71-
72-
dial := func(proto, addr string) (net.Conn, error) {
73-
return net.Dial("unix", sockPath)
74-
}
75-
76-
tr := &http.Transport{Dial: dial}
77-
u, err := url.Parse("http://sock-container")
78-
if err != nil {
79-
panic(err)
80-
}
81-
82-
proxy := httputil.NewSingleHostReverseProxy(u)
83-
proxy.Transport = tr
84-
return proxy, nil
85-
}
86-
8759
func (container *SOCKContainer) freshProc() (err error) {
8860
// get FD to cgroup
8961
cgFiles := make([]*os.File, 1)
@@ -267,6 +239,9 @@ func (container *SOCKContainer) Pause() (err error) {
267239
}
268240
}
269241

242+
// save a little memory
243+
container.client.CloseIdleConnections()
244+
270245
return nil
271246
}
272247

@@ -436,6 +411,10 @@ func (container *SOCKContainer) Meta() *SandboxMeta {
436411
return container.meta
437412
}
438413

414+
func (c *SOCKContainer) Client() (*http.Client) {
415+
return c.client
416+
}
417+
439418
// GetRuntimeLog returns the log of the runtime
440419
func (container *SOCKContainer) GetRuntimeLog() string {
441420
data, err := ioutil.ReadFile(filepath.Join(container.scratchDir, "ol-runtime.log"))

src/sandbox/sockPool.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ import (
77
"path/filepath"
88
"strings"
99
"sync/atomic"
10+
"net"
11+
"net/http"
12+
"time"
1013

1114
"github.com/open-lambda/open-lambda/ol/common"
1215
)
@@ -170,6 +173,23 @@ func (pool *SOCKPool) Create(parent Sandbox, isLeaf bool, codeDir, scratchDir st
170173
t2.T1()
171174
}
172175

176+
// start HTTP client
177+
sockPath := filepath.Join(cSock.scratchDir, "ol.sock")
178+
if len(sockPath) > 108 {
179+
return nil, fmt.Errorf("socket path length cannot exceed 108 characters (try moving cluster closer to the root directory")
180+
}
181+
182+
log.Printf("Connecting to container at '%s'", sockPath)
183+
dial := func(proto, addr string) (net.Conn, error) {
184+
return net.Dial("unix", sockPath)
185+
}
186+
187+
cSock.client = &http.Client{
188+
Transport: &http.Transport{Dial: dial},
189+
Timeout: time.Second * time.Duration(common.Conf.Limits.Max_runtime_default),
190+
}
191+
192+
// event handling
173193
safe.startNotifyingListeners(pool.eventHandlers)
174194
return c, nil
175195
}

0 commit comments

Comments
 (0)