Skip to content

Commit 83caba2

Browse files
committed
util: use cmux instead of a custom solution
1 parent 98b8f4a commit 83caba2

File tree

3 files changed

+89
-257
lines changed

3 files changed

+89
-257
lines changed

GLOCKFILE

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,29 +14,29 @@ cmd github.com/robfig/glock
1414
cmd github.com/tebeka/go2xunit
1515
cmd golang.org/x/tools/cmd/goimports
1616
cmd golang.org/x/tools/cmd/stringer
17-
github.com/Sirupsen/logrus 57cce1ed6103dce7791881d3e69e55f90d986aa5
17+
github.com/Sirupsen/logrus 74bde9ea4c5b2c78995da8ed56d37e75644dc941
1818
github.com/VividCortex/ewma c34099b489e4ac33ca8d8c5f9d29d6eeaf69f2ed
1919
github.com/agtorre/gocolorize f42b554bf7f006936130c9bb4f971afd2d87f671
2020
github.com/biogo/store 3b4c041f52c224ee4a44f5c8b150d003a40643a0
2121
github.com/chzyer/readline a0bb3f70e4e781b8249963efda8dee347565f2a6
22-
github.com/client9/misspell 0c4632c84ed240443adb04ec6b7fa90711f27578
22+
github.com/client9/misspell c9127cb48d96739b0712cff253d86f05d1f431b1
2323
github.com/cloudfoundry/gosigar 3ed7c74352dae6dc00bdc8c74045375352e3ec05
2424
github.com/cockroachdb/c-lz4 c40aaae2fc50293eb8750b34632bc3efe813e23f
2525
github.com/cockroachdb/c-protobuf 4feb192131ea08dfbd7253a00868ad69cbb61b81
2626
github.com/cockroachdb/c-rocksdb c0124c907c74b579d9d3d48eb96471bef270bc25
2727
github.com/cockroachdb/c-snappy 5c6d0932e0adaffce4bfca7bdf2ac37f79952ccf
2828
github.com/cockroachdb/stress aa7690c22fd0abd6168ed0e6c361e4f4c5f7ab25
2929
github.com/codahale/hdrhistogram e88be87d51429689cef99043a54150d733265cd7
30-
github.com/coreos/etcd c3824b10daa09fe99fba66a329db881adeb03aae
30+
github.com/coreos/etcd 410d32a9b14f6052a834a966a02950cde518d7ce
3131
github.com/cpuguy83/go-md2man 2724a9c9051aa62e9cca11304e7dd518e9e41599
32-
github.com/docker/engine-api 0c2bfae8bef6432b075411062c9840406bf6ad3e
32+
github.com/docker/engine-api adfa32fb6ccad724d73a16d53ce3defa46df27e5
3333
github.com/docker/go-connections 34b5052da6b11e27f5f2e357b38b571ddddd3928
3434
github.com/docker/go-units 5d2041e26a699eaca682e2ea41c8f891e1060444
3535
github.com/elazarl/go-bindata-assetfs 57eb5e1fc594ad4b0b1dbea7b286d299e0cb43c2
3636
github.com/go-sql-driver/mysql 1309049fd598c1faa0290fd70ab1afde2b6df0f9
3737
github.com/gogo/protobuf 82d16f734d6d871204a3feb1a73cb220cc92574c
3838
github.com/golang/lint 32a87160691b3c96046c0c678fe57c5bef761456
39-
github.com/golang/protobuf 127091107ff5f822298f1faa7487ffcf578adcf6
39+
github.com/golang/protobuf 3c84672111d91bb5ac31719e112f9f7126a0e26e
4040
github.com/google/btree cc6329d4279e3f025a53a83c397d2339b5705c45
4141
github.com/gordonklaus/ineffassign 507e48671a072e56dbc5dbbee2872ce0d662697e
4242
github.com/jteeuwen/go-bindata a0ff2567cfb70903282db057e799fd826784d41d
@@ -51,22 +51,23 @@ github.com/mattn/go-isatty 56b76bdf51f7708750eac80fa38b952bb9f32639
5151
github.com/mattn/go-runewidth d037b52ae5c0338c2bb18da01fb7ddf0e8be9aa1
5252
github.com/mibk/dupl c450df04426c2f8c35d91fb588feb88fbe328915
5353
github.com/olekukonko/tablewriter cca8bbc0798408af109aaaa239cbd2634846b340
54-
github.com/opencontainers/runc 5204301d1be4ce3f73d4b0a19feb67f3d2adaa6c
54+
github.com/opencontainers/runc 2c3115481ee1782ad687a9e0b4834f89533c2acf
5555
github.com/opennota/check 2647c7f78677e5af42e988a36343bc83194b7109
56-
github.com/opentracing/opentracing-go a8de6ae21279a71fcbd8fa507666db7e8ad6097c
56+
github.com/opentracing/opentracing-go 0d6663129887f81abfd874da4faa8438a46b42dc
5757
github.com/rcrowley/go-metrics 51425a2415d21afadfd55cd93432c0bc69e9598d
5858
github.com/robfig/glock cb3c3ec56de988289cab7bbd284eddc04dfee6c9
5959
github.com/russross/blackfriday 006144af03eeeff1037240a71865a9fd61f1c25f
6060
github.com/satori/go.uuid e673fdd4dea8a7334adbbe7f57b7e4b00bdc5502
6161
github.com/shurcooL/sanitized_anchor_name 10ef21a441db47d8b13ebcc5fd2310f636973c77
62+
github.com/soheilhy/cmux 1a2fcbde3ab1e722390239b941ad35ec716b0d1c
6263
github.com/spf13/cobra 65a708cee0a4424f4e353d031ce440643e312f92
6364
github.com/spf13/pflag 7f60f83a2c81bc3c3c0d5297f61ddfa68da9d3b7
6465
github.com/tebeka/go2xunit 304a3dad367b395460778a969d87101b778b322f
6566
github.com/termie/go-shutil bcacb06fecaeec8dc42af03c87c6949f4a05c74c
6667
golang.org/x/crypto 1f22c0103821b9390939b6776727195525381532
67-
golang.org/x/net b6d7b1396ec874c3b00f6c84cd4301a17c56c8ed
68+
golang.org/x/net 4599ae7937fce9b670ce32b8ad32bbb7ae726b3e
6869
golang.org/x/text 07b9a78963006a15c538ec5175243979025fa7a8
69-
golang.org/x/tools a17fa845d74d17bf6d466f84fd9d3c5602669e53
70-
google.golang.org/grpc 178b68e2819d3ed711040301c12f3f97b6c270ce
70+
golang.org/x/tools f7268ab39bf8b78faa0c2f9eb524b9ea85982c41
71+
google.golang.org/grpc 19b24c30421ae3b637495b7b6bfa656cb72fad9d
7172
gopkg.in/inf.v0 3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4
7273
gopkg.in/yaml.v1 9f9df34309c04878acc86042b16630b0f696e1de

util/net.go

Lines changed: 78 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -17,136 +17,78 @@
1717
package util
1818

1919
import (
20-
"bytes"
2120
"crypto/tls"
22-
"io"
2321
"net"
2422
"net/http"
2523
"strings"
2624
"sync"
25+
"time"
2726

27+
"github.com/soheilhy/cmux"
2828
"golang.org/x/net/http2"
2929

3030
"github.com/cockroachdb/cockroach/util/log"
3131
"github.com/cockroachdb/cockroach/util/stop"
3232
)
3333

34-
const eol = "\r\n"
35-
const hostHeader = "Host: CRDB" + eol
36-
37-
var headerInsertionIndex = int64(strings.Index(http2.ClientPreface, eol) + len(eol))
38-
39-
type replayableConn struct {
40-
net.Conn
41-
buf bytes.Buffer
42-
reader io.Reader
43-
}
44-
45-
func newReplayableConn(conn net.Conn) *replayableConn {
46-
rc := replayableConn{Conn: conn}
47-
rc.reader = io.LimitReader(io.TeeReader(conn, &rc.buf), headerInsertionIndex)
48-
return &rc
49-
}
50-
51-
func (rc *replayableConn) replay() *replayableConn {
52-
rc.reader = io.MultiReader(&rc.buf, rc.Conn)
53-
return rc
54-
}
34+
// ListenAndServe creates a listener and serves handler on it, closing
35+
// the listener when signalled by the stopper.
36+
func ListenAndServe(stopper *stop.Stopper, handler http.Handler, addr net.Addr, tlsConfig *tls.Config) (net.Listener, error) {
37+
ln, err := net.Listen(addr.Network(), addr.String())
38+
if err == nil {
39+
stopper.RunWorker(func() {
40+
<-stopper.ShouldDrain()
41+
// Some unit tests manually close `ln`, so it may already be closed
42+
// when we get here.
43+
if err := ln.Close(); err != nil && !IsClosedConnection(err) {
44+
log.Fatal(err)
45+
}
46+
})
5547

56-
func (rc *replayableConn) Read(p []byte) (int, error) {
57-
if limitReader, ok := rc.reader.(*io.LimitedReader); ok {
58-
// rc.reader is a LimitedReader wrapping a TeeReader.
59-
off := headerInsertionIndex - limitReader.N
60-
n, err := rc.reader.Read(p)
61-
if !strings.HasPrefix(http2.ClientPreface[off:], string(p[:n])) {
62-
// The incoming request is not an HTTP2 request, so buffering is no
63-
// longer required; send all reads directly to the underlying net.Conn.
64-
rc.reader = rc.Conn
65-
rc.buf = bytes.Buffer{} // Release the memory.
66-
} else if err == io.EOF {
67-
// We've exhausted our LimitedReader, which means he incoming request is
68-
// an HTTP2 request. Remember, that we are in this code path means that
69-
// TLS is not in use, which means the caller (net/http machinery) won't
70-
// be able to parse anything after http2.ClientPreface. However, Go 1.6
71-
// introduced strict Host header checking for HTTP >= 1.1 requests (see
72-
// https://github.com/golang/go/commit/6e11f45), and http2.ClientPreface
73-
// contains enough information for the caller to identify this first
74-
// request as an HTTP 2 request, but not enough for the caller to
75-
// determine the value of the Host header. On the next line, we're going
76-
// to help the caller out by providing a bogus HTTP 1.x-style Host
77-
// header. This will get us past Host header verification.
78-
//
79-
// Note that this bogus header won't reappear after replay is called.
80-
rc.reader = io.MultiReader(strings.NewReader(hostHeader), limitReader.R)
48+
if tlsConfig != nil {
49+
ServeHandler(stopper, handler, tls.NewListener(ln, tlsConfig), tlsConfig)
8150
} else {
82-
// The LimitedReader isn't exhausted yet, or we hit an error.
83-
return n, err
84-
}
51+
m := cmux.New(ln)
52+
h2L := m.Match(cmux.HTTP2())
53+
anyL := m.Match(cmux.Any())
54+
55+
var h2 http2.Server
8556

86-
// Cribbed from io.Multireader.
87-
if n > 0 || err != io.EOF {
88-
if err == io.EOF {
89-
// Don't return EOF yet. We've replaced our LimitedReader with rc.Conn or
90-
// a new MultiReader and there may be more bytes in there.
91-
err = nil
57+
serveConnOpts := &http2.ServeConnOpts{
58+
Handler: handler,
9259
}
93-
return n, err
94-
}
95-
}
9660

97-
// Pseudocode:
98-
// if rc.IsHTTP2() {
99-
// if rc.replayCalled {
100-
// rc.reader == io.MultiReader(&rc.buf, rc.Conn)
101-
// } else {
102-
// rc.reader == io.TeeReader(conn, &rc.buf)
103-
// }
104-
// } else {
105-
// rc.reader == rc.Conn
106-
// }
107-
return rc.reader.Read(p)
108-
}
61+
serveH2 := func(conn net.Conn) {
62+
h2.ServeConn(conn, serveConnOpts)
63+
}
10964

110-
type replayableConnListener struct {
111-
net.Listener
112-
}
65+
serveConn := ServeHandler(stopper, handler, anyL, tlsConfig)
11366

114-
func (ml *replayableConnListener) Accept() (net.Conn, error) {
115-
conn, err := ml.Listener.Accept()
116-
if err == nil {
117-
conn = newReplayableConn(conn)
118-
}
119-
return conn, err
120-
}
67+
stopper.RunWorker(func() {
68+
if err := serveConn(h2L, serveH2); err != nil && !IsClosedConnection(err) {
69+
log.Fatal(err)
70+
}
71+
})
12172

122-
// Listen delegates to `net.Listen` and, if tlsConfig is not nil, to `tls.NewListener`.
123-
func Listen(addr net.Addr, tlsConfig *tls.Config) (net.Listener, error) {
124-
ln, err := net.Listen(addr.Network(), addr.String())
125-
if err == nil {
126-
if tlsConfig != nil {
127-
ln = tls.NewListener(ln, tlsConfig)
128-
} else {
129-
ln = &replayableConnListener{ln}
73+
stopper.RunWorker(func() {
74+
if err := m.Serve(); err != nil && !IsClosedConnection(err) {
75+
log.Fatal(err)
76+
}
77+
})
13078
}
13179
}
132-
13380
return ln, err
13481
}
13582

136-
// ListenAndServe creates a listener and serves handler on it, closing
137-
// the listener when signalled by the stopper.
138-
func ListenAndServe(stopper *stop.Stopper, handler http.Handler, addr net.Addr, tlsConfig *tls.Config) (net.Listener, error) {
139-
ln, err := Listen(addr, tlsConfig)
140-
if err != nil {
141-
return nil, err
142-
}
143-
83+
// ServeHandler serves the handler on the listener.
84+
func ServeHandler(stopper *stop.Stopper, handler http.Handler, ln net.Listener, tlsConfig *tls.Config) func(net.Listener, func(net.Conn)) error {
14485
var mu sync.Mutex
14586
activeConns := make(map[net.Conn]struct{})
14687

88+
logger := log.NewStdLogger(log.ErrorLog)
14789
httpServer := http.Server{
148-
TLSConfig: tlsConfig,
14990
Handler: handler,
91+
TLSConfig: tlsConfig,
15092
ConnState: func(conn net.Conn, state http.ConnState) {
15193
mu.Lock()
15294
switch state {
@@ -157,61 +99,61 @@ func ListenAndServe(stopper *stop.Stopper, handler http.Handler, addr net.Addr,
15799
}
158100
mu.Unlock()
159101
},
160-
ErrorLog: log.NewStdLogger(log.ErrorLog),
161-
}
162-
163-
var http2Server http2.Server
164-
165-
if tlsConfig == nil {
166-
connOpts := http2.ServeConnOpts{
167-
BaseConfig: &httpServer,
168-
Handler: handler,
169-
}
170-
171-
httpServer.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
172-
if r.ProtoMajor == 2 {
173-
if conn, _, err := w.(http.Hijacker).Hijack(); err == nil {
174-
http2Server.ServeConn(conn.(*replayableConn).replay(), &connOpts)
175-
} else {
176-
log.Fatal(err)
177-
}
178-
} else {
179-
handler.ServeHTTP(w, r)
180-
}
181-
})
102+
ErrorLog: logger,
182103
}
183104

184-
if err := http2.ConfigureServer(&httpServer, &http2Server); err != nil {
185-
return nil, err
105+
// net/http.(*Server).Serve/http2.ConfigureServer are not thread safe with
106+
// respect to net/http.(*Server).TLSConfig, so we call it synchronously here.
107+
if err := http2.ConfigureServer(&httpServer, nil); err != nil {
108+
log.Fatal(err)
186109
}
187110

188-
stopper.RunWorker(func() {
189-
<-stopper.ShouldDrain()
190-
// Some unit tests manually close `ln`, so it may already be closed
191-
// when we get here.
192-
if err := ln.Close(); err != nil && !IsClosedConnection(err) {
193-
log.Fatal(err)
194-
}
195-
})
196-
197111
stopper.RunWorker(func() {
198112
if err := httpServer.Serve(ln); err != nil && !IsClosedConnection(err) {
199113
log.Fatal(err)
200114
}
201115

202116
<-stopper.ShouldStop()
203-
204117
mu.Lock()
205118
for conn := range activeConns {
206119
conn.Close()
207120
}
208121
mu.Unlock()
209122
})
210123

211-
return ln, nil
124+
logFn := logger.Printf
125+
return func(l net.Listener, serveConn func(net.Conn)) error {
126+
// Inspired by net/http.(*Server).Serve
127+
var tempDelay time.Duration // how long to sleep on accept failure
128+
for {
129+
rw, e := l.Accept()
130+
if e != nil {
131+
if ne, ok := e.(net.Error); ok && ne.Temporary() {
132+
if tempDelay == 0 {
133+
tempDelay = 5 * time.Millisecond
134+
} else {
135+
tempDelay *= 2
136+
}
137+
if max := 1 * time.Second; tempDelay > max {
138+
tempDelay = max
139+
}
140+
logFn("http: Accept error: %v; retrying in %v", e, tempDelay)
141+
time.Sleep(tempDelay)
142+
continue
143+
}
144+
return e
145+
}
146+
tempDelay = 0
147+
go func() {
148+
httpServer.ConnState(rw, http.StateNew) // before Serve can return
149+
serveConn(rw)
150+
httpServer.ConnState(rw, http.StateClosed)
151+
}()
152+
}
153+
}
212154
}
213155

214156
// IsClosedConnection returns true if err is the net package's errClosed.
215157
func IsClosedConnection(err error) bool {
216-
return strings.Contains(err.Error(), "use of closed network connection")
158+
return err == cmux.ErrListenerClosed || strings.Contains(err.Error(), "use of closed network connection")
217159
}

0 commit comments

Comments
 (0)