17
17
package util
18
18
19
19
import (
20
- "bytes"
21
20
"crypto/tls"
22
- "io"
23
21
"net"
24
22
"net/http"
25
23
"strings"
26
24
"sync"
25
+ "time"
27
26
27
+ "github.com/soheilhy/cmux"
28
28
"golang.org/x/net/http2"
29
29
30
30
"github.com/cockroachdb/cockroach/util/log"
31
31
"github.com/cockroachdb/cockroach/util/stop"
32
32
)
33
33
34
- const eol = "\r \n "
35
- const hostHeader = "Host: CRDB" + eol
36
-
37
- var headerInsertionIndex = int64 (strings .Index (http2 .ClientPreface , eol ) + len (eol ))
38
-
39
- type replayableConn struct {
40
- net.Conn
41
- buf bytes.Buffer
42
- reader io.Reader
43
- }
44
-
45
- func newReplayableConn (conn net.Conn ) * replayableConn {
46
- rc := replayableConn {Conn : conn }
47
- rc .reader = io .LimitReader (io .TeeReader (conn , & rc .buf ), headerInsertionIndex )
48
- return & rc
49
- }
50
-
51
- func (rc * replayableConn ) replay () * replayableConn {
52
- rc .reader = io .MultiReader (& rc .buf , rc .Conn )
53
- return rc
54
- }
34
+ // ListenAndServe creates a listener and serves handler on it, closing
35
+ // the listener when signalled by the stopper.
36
+ func ListenAndServe (stopper * stop.Stopper , handler http.Handler , addr net.Addr , tlsConfig * tls.Config ) (net.Listener , error ) {
37
+ ln , err := net .Listen (addr .Network (), addr .String ())
38
+ if err == nil {
39
+ stopper .RunWorker (func () {
40
+ <- stopper .ShouldDrain ()
41
+ // Some unit tests manually close `ln`, so it may already be closed
42
+ // when we get here.
43
+ if err := ln .Close (); err != nil && ! IsClosedConnection (err ) {
44
+ log .Fatal (err )
45
+ }
46
+ })
55
47
56
- func (rc * replayableConn ) Read (p []byte ) (int , error ) {
57
- if limitReader , ok := rc .reader .(* io.LimitedReader ); ok {
58
- // rc.reader is a LimitedReader wrapping a TeeReader.
59
- off := headerInsertionIndex - limitReader .N
60
- n , err := rc .reader .Read (p )
61
- if ! strings .HasPrefix (http2 .ClientPreface [off :], string (p [:n ])) {
62
- // The incoming request is not an HTTP2 request, so buffering is no
63
- // longer required; send all reads directly to the underlying net.Conn.
64
- rc .reader = rc .Conn
65
- rc .buf = bytes.Buffer {} // Release the memory.
66
- } else if err == io .EOF {
67
- // We've exhausted our LimitedReader, which means he incoming request is
68
- // an HTTP2 request. Remember, that we are in this code path means that
69
- // TLS is not in use, which means the caller (net/http machinery) won't
70
- // be able to parse anything after http2.ClientPreface. However, Go 1.6
71
- // introduced strict Host header checking for HTTP >= 1.1 requests (see
72
- // https://github.com/golang/go/commit/6e11f45), and http2.ClientPreface
73
- // contains enough information for the caller to identify this first
74
- // request as an HTTP 2 request, but not enough for the caller to
75
- // determine the value of the Host header. On the next line, we're going
76
- // to help the caller out by providing a bogus HTTP 1.x-style Host
77
- // header. This will get us past Host header verification.
78
- //
79
- // Note that this bogus header won't reappear after replay is called.
80
- rc .reader = io .MultiReader (strings .NewReader (hostHeader ), limitReader .R )
48
+ if tlsConfig != nil {
49
+ ServeHandler (stopper , handler , tls .NewListener (ln , tlsConfig ), tlsConfig )
81
50
} else {
82
- // The LimitedReader isn't exhausted yet, or we hit an error.
83
- return n , err
84
- }
51
+ m := cmux .New (ln )
52
+ h2L := m .Match (cmux .HTTP2 ())
53
+ anyL := m .Match (cmux .Any ())
54
+
55
+ var h2 http2.Server
85
56
86
- // Cribbed from io.Multireader.
87
- if n > 0 || err != io .EOF {
88
- if err == io .EOF {
89
- // Don't return EOF yet. We've replaced our LimitedReader with rc.Conn or
90
- // a new MultiReader and there may be more bytes in there.
91
- err = nil
57
+ serveConnOpts := & http2.ServeConnOpts {
58
+ Handler : handler ,
92
59
}
93
- return n , err
94
- }
95
- }
96
60
97
- // Pseudocode:
98
- // if rc.IsHTTP2() {
99
- // if rc.replayCalled {
100
- // rc.reader == io.MultiReader(&rc.buf, rc.Conn)
101
- // } else {
102
- // rc.reader == io.TeeReader(conn, &rc.buf)
103
- // }
104
- // } else {
105
- // rc.reader == rc.Conn
106
- // }
107
- return rc .reader .Read (p )
108
- }
61
+ serveH2 := func (conn net.Conn ) {
62
+ h2 .ServeConn (conn , serveConnOpts )
63
+ }
109
64
110
- type replayableConnListener struct {
111
- net.Listener
112
- }
65
+ serveConn := ServeHandler (stopper , handler , anyL , tlsConfig )
113
66
114
- func (ml * replayableConnListener ) Accept () (net.Conn , error ) {
115
- conn , err := ml .Listener .Accept ()
116
- if err == nil {
117
- conn = newReplayableConn (conn )
118
- }
119
- return conn , err
120
- }
67
+ stopper .RunWorker (func () {
68
+ if err := serveConn (h2L , serveH2 ); err != nil && ! IsClosedConnection (err ) {
69
+ log .Fatal (err )
70
+ }
71
+ })
121
72
122
- // Listen delegates to `net.Listen` and, if tlsConfig is not nil, to `tls.NewListener`.
123
- func Listen (addr net.Addr , tlsConfig * tls.Config ) (net.Listener , error ) {
124
- ln , err := net .Listen (addr .Network (), addr .String ())
125
- if err == nil {
126
- if tlsConfig != nil {
127
- ln = tls .NewListener (ln , tlsConfig )
128
- } else {
129
- ln = & replayableConnListener {ln }
73
+ stopper .RunWorker (func () {
74
+ if err := m .Serve (); err != nil && ! IsClosedConnection (err ) {
75
+ log .Fatal (err )
76
+ }
77
+ })
130
78
}
131
79
}
132
-
133
80
return ln , err
134
81
}
135
82
136
- // ListenAndServe creates a listener and serves handler on it, closing
137
- // the listener when signalled by the stopper.
138
- func ListenAndServe (stopper * stop.Stopper , handler http.Handler , addr net.Addr , tlsConfig * tls.Config ) (net.Listener , error ) {
139
- ln , err := Listen (addr , tlsConfig )
140
- if err != nil {
141
- return nil , err
142
- }
143
-
83
+ // ServeHandler serves the handler on the listener.
84
+ func ServeHandler (stopper * stop.Stopper , handler http.Handler , ln net.Listener , tlsConfig * tls.Config ) func (net.Listener , func (net.Conn )) error {
144
85
var mu sync.Mutex
145
86
activeConns := make (map [net.Conn ]struct {})
146
87
88
+ logger := log .NewStdLogger (log .ErrorLog )
147
89
httpServer := http.Server {
148
- TLSConfig : tlsConfig ,
149
90
Handler : handler ,
91
+ TLSConfig : tlsConfig ,
150
92
ConnState : func (conn net.Conn , state http.ConnState ) {
151
93
mu .Lock ()
152
94
switch state {
@@ -157,61 +99,61 @@ func ListenAndServe(stopper *stop.Stopper, handler http.Handler, addr net.Addr,
157
99
}
158
100
mu .Unlock ()
159
101
},
160
- ErrorLog : log .NewStdLogger (log .ErrorLog ),
161
- }
162
-
163
- var http2Server http2.Server
164
-
165
- if tlsConfig == nil {
166
- connOpts := http2.ServeConnOpts {
167
- BaseConfig : & httpServer ,
168
- Handler : handler ,
169
- }
170
-
171
- httpServer .Handler = http .HandlerFunc (func (w http.ResponseWriter , r * http.Request ) {
172
- if r .ProtoMajor == 2 {
173
- if conn , _ , err := w .(http.Hijacker ).Hijack (); err == nil {
174
- http2Server .ServeConn (conn .(* replayableConn ).replay (), & connOpts )
175
- } else {
176
- log .Fatal (err )
177
- }
178
- } else {
179
- handler .ServeHTTP (w , r )
180
- }
181
- })
102
+ ErrorLog : logger ,
182
103
}
183
104
184
- if err := http2 .ConfigureServer (& httpServer , & http2Server ); err != nil {
185
- return nil , err
105
+ // net/http.(*Server).Serve/http2.ConfigureServer are not thread safe with
106
+ // respect to net/http.(*Server).TLSConfig, so we call it synchronously here.
107
+ if err := http2 .ConfigureServer (& httpServer , nil ); err != nil {
108
+ log .Fatal (err )
186
109
}
187
110
188
- stopper .RunWorker (func () {
189
- <- stopper .ShouldDrain ()
190
- // Some unit tests manually close `ln`, so it may already be closed
191
- // when we get here.
192
- if err := ln .Close (); err != nil && ! IsClosedConnection (err ) {
193
- log .Fatal (err )
194
- }
195
- })
196
-
197
111
stopper .RunWorker (func () {
198
112
if err := httpServer .Serve (ln ); err != nil && ! IsClosedConnection (err ) {
199
113
log .Fatal (err )
200
114
}
201
115
202
116
<- stopper .ShouldStop ()
203
-
204
117
mu .Lock ()
205
118
for conn := range activeConns {
206
119
conn .Close ()
207
120
}
208
121
mu .Unlock ()
209
122
})
210
123
211
- return ln , nil
124
+ logFn := logger .Printf
125
+ return func (l net.Listener , serveConn func (net.Conn )) error {
126
+ // Inspired by net/http.(*Server).Serve
127
+ var tempDelay time.Duration // how long to sleep on accept failure
128
+ for {
129
+ rw , e := l .Accept ()
130
+ if e != nil {
131
+ if ne , ok := e .(net.Error ); ok && ne .Temporary () {
132
+ if tempDelay == 0 {
133
+ tempDelay = 5 * time .Millisecond
134
+ } else {
135
+ tempDelay *= 2
136
+ }
137
+ if max := 1 * time .Second ; tempDelay > max {
138
+ tempDelay = max
139
+ }
140
+ logFn ("http: Accept error: %v; retrying in %v" , e , tempDelay )
141
+ time .Sleep (tempDelay )
142
+ continue
143
+ }
144
+ return e
145
+ }
146
+ tempDelay = 0
147
+ go func () {
148
+ httpServer .ConnState (rw , http .StateNew ) // before Serve can return
149
+ serveConn (rw )
150
+ httpServer .ConnState (rw , http .StateClosed )
151
+ }()
152
+ }
153
+ }
212
154
}
213
155
214
156
// IsClosedConnection returns true if err is the net package's errClosed.
215
157
func IsClosedConnection (err error ) bool {
216
- return strings .Contains (err .Error (), "use of closed network connection" )
158
+ return err == cmux . ErrListenerClosed || strings .Contains (err .Error (), "use of closed network connection" )
217
159
}
0 commit comments