Skip to content

Commit

Permalink
feat: upstream retries and timeout (#254)
Browse files Browse the repository at this point in the history
* feat: upstream retries and timeout

* doc: add usage and specification for retries and timeout

* test: add e2e cases

* fix: style
  • Loading branch information
tokers authored Feb 18, 2021
1 parent ece373c commit 09787ed
Show file tree
Hide file tree
Showing 11 changed files with 334 additions and 20 deletions.
44 changes: 44 additions & 0 deletions docs/CRD-specification.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ In order to control the behavior of the proxy ([Apache APISIX](https://github.co
- [ApisixUpstream](#apisixupstream)
- [Configuring Load Balancer](#configuring-load-balancer)
- [Configuring Health Check](#configuring-load-balancer)
- [Configuring Retry and Timeout](#configuring-retry-and-timeout)
- [Port Level Settings](#port-level-settings)
- [Configuration References](#configuration-references)
- [ApisixTls](#apisixtls)
Expand Down Expand Up @@ -200,6 +201,44 @@ up once the healthy conditions are met (three consecutive requests got good stat
Note the active health checker is somewhat duplicated with the liveness/readiness probes but it's required if the passive feedback mechanism is in use. So once you use the health check feature in ApisixUpstream,
the active health checker is mandatory.

### Configuring Retry and Timeout

You may want the proxy to retry when requests occur faults like transient network errors
or service unavailable, by default the retry count is `1`. You can change it by specifying the `retries` field.

The following configuration configures the `retries` to `3`, which indicates there'll be at most `3` requests sent to
Kubernetes service `httpbin`'s endpoints.

One should bear in mind that passing a request to the next endpoint is only possible
if nothing has been sent to a client yet. That is, if an error or timeout occurs in the middle
of the transferring of a response, fixing this is impossible.

```yaml
apiVersion: apisix.apache.org/v1
kind: ApisixUpstream
metadata:
name: httpbin
spec:
retries: 3
```

The default connect, read and send timeout are `60s`, which might not proper for some applicartions,
just change them in the `timeout` field.

```yaml
apiVersion: apisix.apache.org/v1
kind: ApisixUpstream
metadata:
name: httpbin
spec:
timeout:
connect: 5s
read: 10s
send: 10s
```

The above examples sets the connect, read and timeout to `5s`, `10s`, `10s` respectively.

### Port Level Settings

Once in a while a single Kubernetes Service might expose multiple ports which provides distinct functions and different Upstream configurations are required.
Expand Down Expand Up @@ -250,6 +289,11 @@ In the meanwhile, the ApisixUpstream `foo` sets `http` scheme for port `7000` an
| loadbalancer.type | string | The load balancing type, can be `roundrobin`, `ewma`, `least_conn`, `chash`, default is `roundrobin`. |
| loadbalancer.hashOn | string | The hash value source scope, only take effects if the `chash` algorithm is in use. Values can `vars`, `header`, `vars_combinations`, `cookie` and `consumers`, default is `vars`. |
| loadbalancer.key | string | The hash key, only in valid if the `chash` algorithm is used.
| retries | int | The retry count. |
| timeout | object | The timeout settings. |
| timeout.connect | time duration in the form "72h3m0.5s" | The connect timeout. |
| timeout.read | time duration in the form "72h3m0.5s" | The read timeout. |
| timeout.send | time duration in the form "72h3m0.5s" | The send timeout. |
| healthCheck | object | The health check parameters, see [Health Check](https://github.com/apache/apisix/blob/master/doc/health-check.md) for more details. |
| healthCheck.active | object | active health check configuration, which is a mandatory field. |
| healthCheck.active.type | string | health check type, can be `http`, `https` and `tcp`, default is `http`. |
Expand Down
14 changes: 8 additions & 6 deletions pkg/apisix/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,14 @@ func (i *item) upstream(clusterName string) (*v1.Upstream, error) {
Group: clusterName,
Name: ups.Desc,
},
Type: ups.LBType,
Key: ups.Key,
HashOn: ups.HashOn,
Nodes: nodes,
Scheme: ups.Scheme,
Checks: ups.Checks,
Type: ups.LBType,
Key: ups.Key,
HashOn: ups.HashOn,
Nodes: nodes,
Scheme: ups.Scheme,
Checks: ups.Checks,
Retries: ups.Retries,
Timeout: ups.Timeout,
}, nil
}

Expand Down
32 changes: 18 additions & 14 deletions pkg/apisix/upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,15 @@ func (n *upstreamNodes) UnmarshalJSON(p []byte) error {
}

type upstreamReqBody struct {
LBType string `json:"type"`
HashOn string `json:"hash_on,omitempty"`
Key string `json:"key,omitempty"`
Nodes upstreamNodes `json:"nodes"`
Desc string `json:"desc"`
Scheme string `json:"scheme,omitempty"`
Checks *v1.UpstreamHealthCheck `json:"checks,omitempty"`
LBType string `json:"type"`
HashOn string `json:"hash_on,omitempty"`
Key string `json:"key,omitempty"`
Nodes upstreamNodes `json:"nodes"`
Desc string `json:"desc"`
Scheme string `json:"scheme,omitempty"`
Retries int `json:"retries,omitempty"`
Timeout *v1.UpstreamTimeout `json:"timeout,omitempty"`
Checks *v1.UpstreamHealthCheck `json:"checks,omitempty"`
}

type upstreamItem upstreamReqBody
Expand Down Expand Up @@ -192,13 +194,15 @@ func (u *upstreamClient) Create(ctx context.Context, obj *v1.Upstream) (*v1.Upst
})
}
body, err := json.Marshal(upstreamReqBody{
LBType: obj.Type,
HashOn: obj.HashOn,
Key: obj.Key,
Nodes: nodes,
Desc: obj.Name,
Scheme: obj.Scheme,
Checks: obj.Checks,
LBType: obj.Type,
HashOn: obj.HashOn,
Key: obj.Key,
Nodes: nodes,
Desc: obj.Name,
Scheme: obj.Scheme,
Checks: obj.Checks,
Retries: obj.Retries,
Timeout: obj.Timeout,
})
if err != nil {
return nil, err
Expand Down
16 changes: 16 additions & 0 deletions pkg/kube/apisix/apis/config/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,27 @@ type ApisixUpstreamConfig struct {
// +optional
Scheme string `json:"scheme,omitempty" yaml:"scheme,omitempty"`

// How many times that the proxy (Apache APISIX) should do when
// errors occur (error, timeout or bad http status codes like 500, 502).
// +optional
Retries int `json:"retries,omitempty" yaml:"retries,omitempty"`

// Timeout settings for the read, send and connect to the upstream.
// +optional
Timeout *UpstreamTimeout `json:"timeout,omitempty" yaml:"timeout,omitempty"`

// The health check configurtions for the upstream.
// +optional
HealthCheck *HealthCheck `json:"healthCheck,omitempty" yaml:"healthCheck,omitempty"`
}

// UpstreamTimeout is settings for the read, send and connect to the upstream.
type UpstreamTimeout struct {
Connect metav1.Duration `json:"connect,omitempty" yaml:"connect,omitempty"`
Send metav1.Duration `json:"send,omitempty" yaml:"send,omitempty"`
Read metav1.Duration `json:"read,omitempty" yaml:"read,omitempty"`
}

// PortLevelSettings configures the ApisixUpstreamConfig for each individual port. It inherits
// configurations from the outer level (the whole Kubernetes Service) and overrides some of
// them if they are set on the port level.
Expand Down
26 changes: 26 additions & 0 deletions pkg/kube/apisix/apis/config/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 49 additions & 0 deletions pkg/kube/apisix_upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,55 @@ import (
apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)

func (t *translator) translateUpstreamRetriesAndTimeout(retries int, timeout *configv1.UpstreamTimeout, ups *apisixv1.Upstream) error {
if retries < 0 {
return &translateError{
field: "retries",
reason: "invalid value",
}
}
ups.Retries = retries
if timeout == nil {
return nil
}

// Since the schema of timeout doesn't allow only configuring
// one or two items. Here we assign the default value first.
connTimeout := apisixv1.DefaultUpstreamTimeout
readTimeout := apisixv1.DefaultUpstreamTimeout
sendTimeout := apisixv1.DefaultUpstreamTimeout
if timeout.Connect.Duration < 0 {
return &translateError{
field: "timeout.connect",
reason: "invalid value",
}
} else if timeout.Connect.Duration > 0 {
connTimeout = int(timeout.Connect.Seconds())
}
if timeout.Read.Duration < 0 {
return &translateError{
field: "timeout.read",
reason: "invalid value",
}
} else if timeout.Read.Duration > 0 {
readTimeout = int(timeout.Read.Seconds())
}
if timeout.Send.Duration < 0 {
return &translateError{
field: "timeout.send",
reason: "invalid value",
}
} else if timeout.Send.Duration > 0 {
sendTimeout = int(timeout.Send.Seconds())
}
ups.Timeout = &apisixv1.UpstreamTimeout{
Connect: connTimeout,
Send: sendTimeout,
Read: readTimeout,
}
return nil
}

func (t *translator) translateUpstreamScheme(scheme string, ups *apisixv1.Upstream) error {
if scheme == "" {
ups.Scheme = apisixv1.SchemeHTTP
Expand Down
36 changes: 36 additions & 0 deletions pkg/kube/apisix_upstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,3 +369,39 @@ func TestTranslateUpstreamActiveHealthCheckUnusually(t *testing.T) {
reason: "invalid value",
})
}

func TestUpstreamRetriesAndTimeout(t *testing.T) {
tr := &translator{}
err := tr.translateUpstreamRetriesAndTimeout(-1, nil, nil)
assert.Equal(t, err, &translateError{
field: "retries",
reason: "invalid value",
})

var ups apisixv1.Upstream
err = tr.translateUpstreamRetriesAndTimeout(3, nil, &ups)
assert.Nil(t, err)
assert.Equal(t, ups.Retries, 3)

timeout := &configv1.UpstreamTimeout{
Connect: metav1.Duration{Duration: time.Second},
Read: metav1.Duration{Duration: -1},
}
err = tr.translateUpstreamRetriesAndTimeout(3, timeout, &ups)
assert.Equal(t, err, &translateError{
field: "timeout.read",
reason: "invalid value",
})

timeout = &configv1.UpstreamTimeout{
Connect: metav1.Duration{Duration: time.Second},
Read: metav1.Duration{Duration: 15 * time.Second},
}
err = tr.translateUpstreamRetriesAndTimeout(3, timeout, &ups)
assert.Nil(t, err)
assert.Equal(t, ups.Timeout, &apisixv1.UpstreamTimeout{
Connect: 1,
Send: 60,
Read: 15,
})
}
3 changes: 3 additions & 0 deletions pkg/kube/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ func (t *translator) TranslateUpstreamConfig(au *configv1.ApisixUpstreamConfig)
if err := t.translateUpstreamHealthCheck(au.HealthCheck, ups); err != nil {
return nil, err
}
if err := t.translateUpstreamRetriesAndTimeout(au.Retries, au.Timeout, ups); err != nil {
return nil, err
}
return ups, nil
}

Expand Down
15 changes: 15 additions & 0 deletions pkg/types/apisix/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ const (
// ActiveHealthCheckMinInterval is the minimum interval for
// the active health check.
ActiveHealthCheckMinInterval = time.Second

// Default connect, read and send timeout (in seconds) with upstreams.
DefaultUpstreamTimeout = 60
)

// Metadata contains all meta information about resources.
Expand Down Expand Up @@ -130,6 +133,18 @@ type Upstream struct {
Nodes []UpstreamNode `json:"nodes,omitempty" yaml:"nodes,omitempty"`
FromKind string `json:"from_kind,omitempty" yaml:"from_kind,omitempty"`
Scheme string `json:"scheme,omitempty" yaml:"scheme,omitempty"`
Retries int `json:"retries,omitempty" yaml:"retries,omitempty"`
Timeout *UpstreamTimeout `json:"timeout,omitempty" yaml:"timeout,omitempty"`
}

// UpstreamTimeout represents the timeout settings on Upstream.
type UpstreamTimeout struct {
// Connect is the connect timeout
Connect int `json:"connect" yaml:"connect"`
// Send is the send timeout
Send int `json:"send" yaml:"send"`
// Read is the read timeout
Read int `json:"read" yaml:"read"`
}

// Node the node in upstream
Expand Down
5 changes: 5 additions & 0 deletions pkg/types/apisix/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 09787ed

Please sign in to comment.