Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merging to release-4-lts: [TT-9284] Ensure that old transport will close idle connections (#5231) #5741

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions gateway/proxy_muxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,9 @@ func (m *proxyMux) serve(gw *Gateway) {
WriteTimeout: writeTimeout,
Handler: h,
}
if gw.ConnectionWatcher != nil {
p.httpServer.ConnState = gw.ConnectionWatcher.OnStateChange
}

if conf.CloseConnections {
p.httpServer.SetKeepAlivesEnabled(false)
Expand Down
15 changes: 15 additions & 0 deletions gateway/reverse_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,8 @@ type ReverseProxy struct {
Gw *Gateway `json:"-"`
}

var idleConnTimeout = 90

func (p *ReverseProxy) defaultTransport(dialerTimeout float64) *http.Transport {
timeout := 30.0
if dialerTimeout > 0 {
Expand All @@ -413,6 +415,7 @@ func (p *ReverseProxy) defaultTransport(dialerTimeout float64) *http.Transport {
DialContext: dialContextFunc,
MaxIdleConns: p.Gw.GetConfig().MaxIdleConns,
MaxIdleConnsPerHost: p.Gw.GetConfig().MaxIdleConnsPerHost, // default is 100
IdleConnTimeout: time.Duration(idleConnTimeout) * time.Second,
ResponseHeaderTimeout: time.Duration(dialerTimeout) * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
}
Expand Down Expand Up @@ -1274,9 +1277,21 @@ func (p *ReverseProxy) WrappedServeHTTP(rw http.ResponseWriter, req *http.Reques
}

if createTransport {
var oldTransport *http.Transport

if p.TykAPISpec.HTTPTransport != nil {
oldTransport = p.TykAPISpec.HTTPTransport.transport
// Prevent new idle connections to be generated.
oldTransport.DisableKeepAlives = true
}

_, timeout := p.CheckHardTimeoutEnforced(p.TykAPISpec, req)
p.TykAPISpec.HTTPTransport = p.httpTransport(timeout, rw, req, outreq)
p.TykAPISpec.HTTPTransportCreated = time.Now()

if oldTransport != nil {
oldTransport.CloseIdleConnections()
}
}

roundTripper = p.TykAPISpec.HTTPTransport
Expand Down
16 changes: 12 additions & 4 deletions gateway/reverse_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,13 +369,21 @@ func (s *Test) TestNewWrappedServeHTTP() *ReverseProxy {
}

func TestWrappedServeHTTP(t *testing.T) {
idleConnTimeout = 1

ts := StartTest(nil)
defer ts.Close()

proxy := ts.TestNewWrappedServeHTTP()
recorder := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodGet, "/", nil)
proxy.WrappedServeHTTP(recorder, req, false)
for i := 0; i < 10; i++ {
proxy := ts.TestNewWrappedServeHTTP()
recorder := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodGet, "/", nil)
proxy.WrappedServeHTTP(recorder, req, false)
}

assert.Equal(t, 10, ts.Gw.ConnectionWatcher.Count())
time.Sleep(time.Second * 2)
assert.Equal(t, 0, ts.Gw.ConnectionWatcher.Count())
}

func TestCircuitBreaker5xxs(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions gateway/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sync"

"github.com/TykTechnologies/tyk/internal/crypto"
"github.com/TykTechnologies/tyk/internal/httputil"

"sync/atomic"
textTemplate "text/template"
Expand Down Expand Up @@ -109,6 +110,7 @@ type Gateway struct {
DashService DashboardServiceSender
CertificateManager *certs.CertificateManager
GlobalHostChecker HostCheckerManager
ConnectionWatcher *httputil.ConnectionWatcher
HostCheckTicker chan struct{}
HostCheckerClient *http.Client

Expand Down Expand Up @@ -213,6 +215,7 @@ func NewGateway(config config.Config, ctx context.Context, cancelFn context.Canc
gw.HostCheckerClient = &http.Client{
Timeout: 500 * time.Millisecond,
}
gw.ConnectionWatcher = httputil.NewConnectionWatcher()

gw.SessionCache = cache.New(10*time.Second, 5*time.Second)
gw.ExpiryCache = cache.New(600*time.Second, 10*time.Minute)
Expand Down
5 changes: 3 additions & 2 deletions gateway/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/gorilla/websocket"
"golang.org/x/net/context"

"github.com/TykTechnologies/tyk/internal/httputil"
"github.com/TykTechnologies/tyk/internal/uuid"

"github.com/TykTechnologies/graphql-go-tools/pkg/execution/datasource"
Expand Down Expand Up @@ -1018,10 +1019,9 @@ func (s *Test) BootstrapGw(ctx context.Context, cancelFn context.CancelFunc, gen
}
gwConfig.CoProcessOptions = s.config.CoprocessConfig

s.gwMu.Lock()
s.Gw = NewGateway(gwConfig, ctx, cancelFn)
s.Gw.setTestMode(true)
s.gwMu.Unlock()
s.Gw.ConnectionWatcher = httputil.NewConnectionWatcher()

s.MockHandle = MockHandle

Expand Down Expand Up @@ -1067,6 +1067,7 @@ func (s *Test) BootstrapGw(ctx context.Context, cancelFn context.CancelFunc, gen
Handler: s.TestServerRouter,
ReadTimeout: 1 * time.Second,
WriteTimeout: 1 * time.Second,
ConnState: s.Gw.ConnectionWatcher.OnStateChange,
MaxHeaderBytes: 1 << 20,
}

Expand Down
40 changes: 40 additions & 0 deletions internal/httputil/connection_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package httputil

import (
"net"
"net/http"
"sync/atomic"
)

// ConnectionWatcher counts http server connections.
type ConnectionWatcher struct {
n int64
}

// NewConnectionWatcher returns a new *ConnectionWatcher.
func NewConnectionWatcher() *ConnectionWatcher {
return &ConnectionWatcher{}
}

// OnStateChange records open connections in response to connection
// state changes. Set net/http Server.ConnState to this method
// as value.
func (cw *ConnectionWatcher) OnStateChange(_ net.Conn, state http.ConnState) {
switch state {
case http.StateNew:
cw.Add(1)
case http.StateHijacked, http.StateClosed:
cw.Add(-1)
}
}

// Count returns the number of connections at the time the call.

func (cw *ConnectionWatcher) Count() int {
return int(atomic.LoadInt64(&cw.n))
}

// Add adds c to the number of active connections.
func (cw *ConnectionWatcher) Add(c int64) {
atomic.AddInt64(&cw.n, c)
}
28 changes: 28 additions & 0 deletions internal/httputil/connection_watcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package httputil_test

import (
"net/http"
"testing"

"github.com/stretchr/testify/assert"

"github.com/TykTechnologies/tyk/internal/httputil"
)

func TestConnectionWatcher(t *testing.T) {
w := httputil.NewConnectionWatcher()
w.Add(1)
assert.Equal(t, 1, w.Count())
w.Add(2)
assert.Equal(t, 3, w.Count())
w.Add(-3)
assert.Equal(t, 0, w.Count())

w.OnStateChange(nil, http.StateNew)
assert.Equal(t, 1, w.Count())

w.OnStateChange(nil, http.StateClosed)
w.OnStateChange(nil, http.StateHijacked)
assert.Equal(t, -1, w.Count())

}
Loading