Skip to content

Commit

Permalink
Merging to release-4-lts: [TT-9284] Ensure that old transport will cl…
Browse files Browse the repository at this point in the history
…ose idle connections (#5231) (#5741)

[TT-9284] Ensure that old transport will close idle connections (#5231)

<details open>
  <br />
  <table>
    <tr>
      <th>Summary</th>
<td>[CS] The gateway leaks file handles via sockets when max_conn_time
is above 0</td>
    </tr>
    <tr>
      <th>Type</th>
      <td>
<img alt="Bug"

src="https://tyktech.atlassian.net/rest/api/2/universal_avatar/view/type/issuetype/avatar/10303?size=medium"
/>
        Bug
      </td>
    </tr>
    <tr>
      <th>Status</th>
      <td>In Code Review</td>
    </tr>
    <tr>
      <th>Points</th>
      <td>N/A</td>
    </tr>
  </table>
</details>
<!--
  do not remove this marker as it will break jira-lint's functionality.
  added_by_jira_lint
-->

---

TBD

## Description

<!-- Describe your changes in detail -->

## Related Issue

<!-- This project only accepts pull requests related to open issues. -->
<!-- If suggesting a new feature or change, please discuss it in an
issue first. -->
<!-- If fixing a bug, there should be an issue describing it with steps
to reproduce. -->
<!-- OSS: Please link to the issue here. Tyk: please create/link the
JIRA ticket. -->

## Motivation and Context

<!-- Why is this change required? What problem does it solve? -->

## How This Has Been Tested

<!-- Please describe in detail how you tested your changes -->
<!-- Include details of your testing environment, and the tests -->
<!-- you ran to see how your change affects other areas of the code,
etc. -->
<!-- This information is helpful for reviewers and QA. -->

## Screenshots (if appropriate)

## Types of changes

<!-- What types of changes does your code introduce? Put an `x` in all
the boxes that apply: -->

- [ ] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
- [ ] Refactoring or add test (improvements in base code or adds test
coverage to functionality)

## Checklist

<!-- Go over all the following points, and put an `x` in all the boxes
that apply -->
<!-- If there are no documentation updates required, mark the item as
checked. -->
<!-- Raise up any additional concerns not covered by the checklist. -->

- [ ] I ensured that the documentation is up to date
- [ ] I explained why this PR updates go.mod in detail with reasoning
why it's required
- [ ] I would like a code coverage CI quality gate exception and have
explained why

---------

Co-authored-by: Tit Petric <tit@tyk.io>

[TT-9284]:
https://tyktech.atlassian.net/browse/TT-9284?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ

---------

Co-authored-by: Tit Petric <tit@tyk.io>
  • Loading branch information
2 people authored and lghiur committed Jan 22, 2024
1 parent 271ee77 commit e34a6a9
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 6 deletions.
3 changes: 3 additions & 0 deletions gateway/proxy_muxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,9 @@ func (m *proxyMux) serve(gw *Gateway) {
WriteTimeout: writeTimeout,
Handler: h,
}
if gw.ConnectionWatcher != nil {
p.httpServer.ConnState = gw.ConnectionWatcher.OnStateChange
}

if conf.CloseConnections {
p.httpServer.SetKeepAlivesEnabled(false)
Expand Down
15 changes: 15 additions & 0 deletions gateway/reverse_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,8 @@ type ReverseProxy struct {
Gw *Gateway `json:"-"`
}

var idleConnTimeout = 90

func (p *ReverseProxy) defaultTransport(dialerTimeout float64) *http.Transport {
timeout := 30.0
if dialerTimeout > 0 {
Expand All @@ -413,6 +415,7 @@ func (p *ReverseProxy) defaultTransport(dialerTimeout float64) *http.Transport {
DialContext: dialContextFunc,
MaxIdleConns: p.Gw.GetConfig().MaxIdleConns,
MaxIdleConnsPerHost: p.Gw.GetConfig().MaxIdleConnsPerHost, // default is 100
IdleConnTimeout: time.Duration(idleConnTimeout) * time.Second,
ResponseHeaderTimeout: time.Duration(dialerTimeout) * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
}
Expand Down Expand Up @@ -1274,9 +1277,21 @@ func (p *ReverseProxy) WrappedServeHTTP(rw http.ResponseWriter, req *http.Reques
}

if createTransport {
var oldTransport *http.Transport

if p.TykAPISpec.HTTPTransport != nil {
oldTransport = p.TykAPISpec.HTTPTransport.transport
// Prevent new idle connections to be generated.
oldTransport.DisableKeepAlives = true
}

_, timeout := p.CheckHardTimeoutEnforced(p.TykAPISpec, req)
p.TykAPISpec.HTTPTransport = p.httpTransport(timeout, rw, req, outreq)
p.TykAPISpec.HTTPTransportCreated = time.Now()

if oldTransport != nil {
oldTransport.CloseIdleConnections()
}
}

roundTripper = p.TykAPISpec.HTTPTransport
Expand Down
16 changes: 12 additions & 4 deletions gateway/reverse_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,13 +369,21 @@ func (s *Test) TestNewWrappedServeHTTP() *ReverseProxy {
}

func TestWrappedServeHTTP(t *testing.T) {
idleConnTimeout = 1

ts := StartTest(nil)
defer ts.Close()

proxy := ts.TestNewWrappedServeHTTP()
recorder := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodGet, "/", nil)
proxy.WrappedServeHTTP(recorder, req, false)
for i := 0; i < 10; i++ {
proxy := ts.TestNewWrappedServeHTTP()
recorder := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodGet, "/", nil)
proxy.WrappedServeHTTP(recorder, req, false)
}

assert.Equal(t, 10, ts.Gw.ConnectionWatcher.Count())
time.Sleep(time.Second * 2)
assert.Equal(t, 0, ts.Gw.ConnectionWatcher.Count())
}

func TestCircuitBreaker5xxs(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions gateway/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sync"

"github.com/TykTechnologies/tyk/internal/crypto"
"github.com/TykTechnologies/tyk/internal/httputil"

"sync/atomic"
textTemplate "text/template"
Expand Down Expand Up @@ -109,6 +110,7 @@ type Gateway struct {
DashService DashboardServiceSender
CertificateManager *certs.CertificateManager
GlobalHostChecker HostCheckerManager
ConnectionWatcher *httputil.ConnectionWatcher
HostCheckTicker chan struct{}
HostCheckerClient *http.Client

Expand Down Expand Up @@ -213,6 +215,7 @@ func NewGateway(config config.Config, ctx context.Context, cancelFn context.Canc
gw.HostCheckerClient = &http.Client{
Timeout: 500 * time.Millisecond,
}
gw.ConnectionWatcher = httputil.NewConnectionWatcher()

gw.SessionCache = cache.New(10*time.Second, 5*time.Second)
gw.ExpiryCache = cache.New(600*time.Second, 10*time.Minute)
Expand Down
5 changes: 3 additions & 2 deletions gateway/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/gorilla/websocket"
"golang.org/x/net/context"

"github.com/TykTechnologies/tyk/internal/httputil"
"github.com/TykTechnologies/tyk/internal/uuid"

"github.com/TykTechnologies/graphql-go-tools/pkg/execution/datasource"
Expand Down Expand Up @@ -1018,10 +1019,9 @@ func (s *Test) BootstrapGw(ctx context.Context, cancelFn context.CancelFunc, gen
}
gwConfig.CoProcessOptions = s.config.CoprocessConfig

s.gwMu.Lock()
s.Gw = NewGateway(gwConfig, ctx, cancelFn)
s.Gw.setTestMode(true)
s.gwMu.Unlock()
s.Gw.ConnectionWatcher = httputil.NewConnectionWatcher()

s.MockHandle = MockHandle

Expand Down Expand Up @@ -1067,6 +1067,7 @@ func (s *Test) BootstrapGw(ctx context.Context, cancelFn context.CancelFunc, gen
Handler: s.TestServerRouter,
ReadTimeout: 1 * time.Second,
WriteTimeout: 1 * time.Second,
ConnState: s.Gw.ConnectionWatcher.OnStateChange,
MaxHeaderBytes: 1 << 20,
}

Expand Down
40 changes: 40 additions & 0 deletions internal/httputil/connection_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package httputil

import (
"net"
"net/http"
"sync/atomic"
)

// ConnectionWatcher counts http server connections.
type ConnectionWatcher struct {
n int64
}

// NewConnectionWatcher returns a new *ConnectionWatcher.
func NewConnectionWatcher() *ConnectionWatcher {
return &ConnectionWatcher{}
}

// OnStateChange records open connections in response to connection
// state changes. Set net/http Server.ConnState to this method
// as value.
func (cw *ConnectionWatcher) OnStateChange(_ net.Conn, state http.ConnState) {
switch state {
case http.StateNew:
cw.Add(1)
case http.StateHijacked, http.StateClosed:
cw.Add(-1)
}
}

// Count returns the number of connections at the time the call.

func (cw *ConnectionWatcher) Count() int {
return int(atomic.LoadInt64(&cw.n))
}

// Add adds c to the number of active connections.
func (cw *ConnectionWatcher) Add(c int64) {
atomic.AddInt64(&cw.n, c)
}
28 changes: 28 additions & 0 deletions internal/httputil/connection_watcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package httputil_test

import (
"net/http"
"testing"

"github.com/stretchr/testify/assert"

"github.com/TykTechnologies/tyk/internal/httputil"
)

func TestConnectionWatcher(t *testing.T) {
w := httputil.NewConnectionWatcher()
w.Add(1)
assert.Equal(t, 1, w.Count())
w.Add(2)
assert.Equal(t, 3, w.Count())
w.Add(-3)
assert.Equal(t, 0, w.Count())

w.OnStateChange(nil, http.StateNew)
assert.Equal(t, 1, w.Count())

w.OnStateChange(nil, http.StateClosed)
w.OnStateChange(nil, http.StateHijacked)
assert.Equal(t, -1, w.Count())

}

0 comments on commit e34a6a9

Please sign in to comment.