diff --git a/docs/CRD-specification.md b/docs/CRD-specification.md index 9dffa0b61e..c63450acb4 100644 --- a/docs/CRD-specification.md +++ b/docs/CRD-specification.md @@ -27,6 +27,7 @@ In order to control the behavior of the proxy ([Apache APISIX](https://github.co - [ApisixUpstream](#apisixupstream) - [Configuring Load Balancer](#configuring-load-balancer) - [Configuring Health Check](#configuring-load-balancer) + - [Configuring Retry and Timeout](#configuring-retry-and-timeout) - [Port Level Settings](#port-level-settings) - [Configuration References](#configuration-references) - [ApisixTls](#apisixtls) @@ -200,6 +201,44 @@ up once the healthy conditions are met (three consecutive requests got good stat Note the active health checker is somewhat duplicated with the liveness/readiness probes but it's required if the passive feedback mechanism is in use. So once you use the health check feature in ApisixUpstream, the active health checker is mandatory. +### Configuring Retry and Timeout + +You may want the proxy to retry when requests occur faults like transient network errors +or service unavailable, by default the retry count is `1`. You can change it by specifying the `retries` field. + +The following configuration configures the `retries` to `3`, which indicates there'll be at most `3` requests sent to +Kubernetes service `httpbin`'s endpoints. + +One should bear in mind that passing a request to the next endpoint is only possible +if nothing has been sent to a client yet. That is, if an error or timeout occurs in the middle +of the transferring of a response, fixing this is impossible. + +```yaml +apiVersion: apisix.apache.org/v1 +kind: ApisixUpstream +metadata: + name: httpbin +spec: + retries: 3 +``` + +The default connect, read and send timeout are `60s`, which might not proper for some applicartions, +just change them in the `timeout` field. + +```yaml +apiVersion: apisix.apache.org/v1 +kind: ApisixUpstream +metadata: + name: httpbin +spec: + timeout: + connect: 5s + read: 10s + send: 10s +``` + +The above examples sets the connect, read and timeout to `5s`, `10s`, `10s` respectively. + ### Port Level Settings Once in a while a single Kubernetes Service might expose multiple ports which provides distinct functions and different Upstream configurations are required. @@ -250,6 +289,11 @@ In the meanwhile, the ApisixUpstream `foo` sets `http` scheme for port `7000` an | loadbalancer.type | string | The load balancing type, can be `roundrobin`, `ewma`, `least_conn`, `chash`, default is `roundrobin`. | | loadbalancer.hashOn | string | The hash value source scope, only take effects if the `chash` algorithm is in use. Values can `vars`, `header`, `vars_combinations`, `cookie` and `consumers`, default is `vars`. | | loadbalancer.key | string | The hash key, only in valid if the `chash` algorithm is used. +| retries | int | The retry count. | +| timeout | object | The timeout settings. | +| timeout.connect | time duration in the form "72h3m0.5s" | The connect timeout. | +| timeout.read | time duration in the form "72h3m0.5s" | The read timeout. | +| timeout.send | time duration in the form "72h3m0.5s" | The send timeout. | | healthCheck | object | The health check parameters, see [Health Check](https://github.com/apache/apisix/blob/master/doc/health-check.md) for more details. | | healthCheck.active | object | active health check configuration, which is a mandatory field. | | healthCheck.active.type | string | health check type, can be `http`, `https` and `tcp`, default is `http`. | diff --git a/pkg/apisix/resource.go b/pkg/apisix/resource.go index aa95aad8a5..b5f5f5b2d4 100644 --- a/pkg/apisix/resource.go +++ b/pkg/apisix/resource.go @@ -155,12 +155,14 @@ func (i *item) upstream(clusterName string) (*v1.Upstream, error) { Group: clusterName, Name: ups.Desc, }, - Type: ups.LBType, - Key: ups.Key, - HashOn: ups.HashOn, - Nodes: nodes, - Scheme: ups.Scheme, - Checks: ups.Checks, + Type: ups.LBType, + Key: ups.Key, + HashOn: ups.HashOn, + Nodes: nodes, + Scheme: ups.Scheme, + Checks: ups.Checks, + Retries: ups.Retries, + Timeout: ups.Timeout, }, nil } diff --git a/pkg/apisix/upstream.go b/pkg/apisix/upstream.go index 137ab99eb7..43f6c68d7b 100644 --- a/pkg/apisix/upstream.go +++ b/pkg/apisix/upstream.go @@ -62,13 +62,15 @@ func (n *upstreamNodes) UnmarshalJSON(p []byte) error { } type upstreamReqBody struct { - LBType string `json:"type"` - HashOn string `json:"hash_on,omitempty"` - Key string `json:"key,omitempty"` - Nodes upstreamNodes `json:"nodes"` - Desc string `json:"desc"` - Scheme string `json:"scheme,omitempty"` - Checks *v1.UpstreamHealthCheck `json:"checks,omitempty"` + LBType string `json:"type"` + HashOn string `json:"hash_on,omitempty"` + Key string `json:"key,omitempty"` + Nodes upstreamNodes `json:"nodes"` + Desc string `json:"desc"` + Scheme string `json:"scheme,omitempty"` + Retries int `json:"retries,omitempty"` + Timeout *v1.UpstreamTimeout `json:"timeout,omitempty"` + Checks *v1.UpstreamHealthCheck `json:"checks,omitempty"` } type upstreamItem upstreamReqBody @@ -192,13 +194,15 @@ func (u *upstreamClient) Create(ctx context.Context, obj *v1.Upstream) (*v1.Upst }) } body, err := json.Marshal(upstreamReqBody{ - LBType: obj.Type, - HashOn: obj.HashOn, - Key: obj.Key, - Nodes: nodes, - Desc: obj.Name, - Scheme: obj.Scheme, - Checks: obj.Checks, + LBType: obj.Type, + HashOn: obj.HashOn, + Key: obj.Key, + Nodes: nodes, + Desc: obj.Name, + Scheme: obj.Scheme, + Checks: obj.Checks, + Retries: obj.Retries, + Timeout: obj.Timeout, }) if err != nil { return nil, err diff --git a/pkg/kube/apisix/apis/config/v1/types.go b/pkg/kube/apisix/apis/config/v1/types.go index 87ba3d7b42..1edc2af3db 100644 --- a/pkg/kube/apisix/apis/config/v1/types.go +++ b/pkg/kube/apisix/apis/config/v1/types.go @@ -105,11 +105,27 @@ type ApisixUpstreamConfig struct { // +optional Scheme string `json:"scheme,omitempty" yaml:"scheme,omitempty"` + // How many times that the proxy (Apache APISIX) should do when + // errors occur (error, timeout or bad http status codes like 500, 502). + // +optional + Retries int `json:"retries,omitempty" yaml:"retries,omitempty"` + + // Timeout settings for the read, send and connect to the upstream. + // +optional + Timeout *UpstreamTimeout `json:"timeout,omitempty" yaml:"timeout,omitempty"` + // The health check configurtions for the upstream. // +optional HealthCheck *HealthCheck `json:"healthCheck,omitempty" yaml:"healthCheck,omitempty"` } +// UpstreamTimeout is settings for the read, send and connect to the upstream. +type UpstreamTimeout struct { + Connect metav1.Duration `json:"connect,omitempty" yaml:"connect,omitempty"` + Send metav1.Duration `json:"send,omitempty" yaml:"send,omitempty"` + Read metav1.Duration `json:"read,omitempty" yaml:"read,omitempty"` +} + // PortLevelSettings configures the ApisixUpstreamConfig for each individual port. It inherits // configurations from the outer level (the whole Kubernetes Service) and overrides some of // them if they are set on the port level. diff --git a/pkg/kube/apisix/apis/config/v1/zz_generated.deepcopy.go b/pkg/kube/apisix/apis/config/v1/zz_generated.deepcopy.go index bff68aa9a4..955499b351 100644 --- a/pkg/kube/apisix/apis/config/v1/zz_generated.deepcopy.go +++ b/pkg/kube/apisix/apis/config/v1/zz_generated.deepcopy.go @@ -64,6 +64,7 @@ func (in *ActiveHealthCheck) DeepCopy() *ActiveHealthCheck { func (in *ActiveHealthCheckHealthy) DeepCopyInto(out *ActiveHealthCheckHealthy) { *out = *in in.PassiveHealthCheckHealthy.DeepCopyInto(&out.PassiveHealthCheckHealthy) + out.Interval = in.Interval return } @@ -81,6 +82,7 @@ func (in *ActiveHealthCheckHealthy) DeepCopy() *ActiveHealthCheckHealthy { func (in *ActiveHealthCheckUnhealthy) DeepCopyInto(out *ActiveHealthCheckUnhealthy) { *out = *in in.PassiveHealthCheckUnhealthy.DeepCopyInto(&out.PassiveHealthCheckUnhealthy) + out.Interval = in.Interval return } @@ -322,6 +324,11 @@ func (in *ApisixUpstreamConfig) DeepCopyInto(out *ApisixUpstreamConfig) { *out = new(LoadBalancer) **out = **in } + if in.Timeout != nil { + in, out := &in.Timeout, &out.Timeout + *out = new(UpstreamTimeout) + **out = **in + } if in.HealthCheck != nil { in, out := &in.HealthCheck, &out.HealthCheck *out = new(HealthCheck) @@ -621,3 +628,22 @@ func (in *Rule) DeepCopy() *Rule { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *UpstreamTimeout) DeepCopyInto(out *UpstreamTimeout) { + *out = *in + out.Connect = in.Connect + out.Send = in.Send + out.Read = in.Read + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UpstreamTimeout. +func (in *UpstreamTimeout) DeepCopy() *UpstreamTimeout { + if in == nil { + return nil + } + out := new(UpstreamTimeout) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/kube/apisix_upstream.go b/pkg/kube/apisix_upstream.go index 869713612f..f38a053ef1 100644 --- a/pkg/kube/apisix_upstream.go +++ b/pkg/kube/apisix_upstream.go @@ -19,6 +19,55 @@ import ( apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" ) +func (t *translator) translateUpstreamRetriesAndTimeout(retries int, timeout *configv1.UpstreamTimeout, ups *apisixv1.Upstream) error { + if retries < 0 { + return &translateError{ + field: "retries", + reason: "invalid value", + } + } + ups.Retries = retries + if timeout == nil { + return nil + } + + // Since the schema of timeout doesn't allow only configuring + // one or two items. Here we assign the default value first. + connTimeout := apisixv1.DefaultUpstreamTimeout + readTimeout := apisixv1.DefaultUpstreamTimeout + sendTimeout := apisixv1.DefaultUpstreamTimeout + if timeout.Connect.Duration < 0 { + return &translateError{ + field: "timeout.connect", + reason: "invalid value", + } + } else if timeout.Connect.Duration > 0 { + connTimeout = int(timeout.Connect.Seconds()) + } + if timeout.Read.Duration < 0 { + return &translateError{ + field: "timeout.read", + reason: "invalid value", + } + } else if timeout.Read.Duration > 0 { + readTimeout = int(timeout.Read.Seconds()) + } + if timeout.Send.Duration < 0 { + return &translateError{ + field: "timeout.send", + reason: "invalid value", + } + } else if timeout.Send.Duration > 0 { + sendTimeout = int(timeout.Send.Seconds()) + } + ups.Timeout = &apisixv1.UpstreamTimeout{ + Connect: connTimeout, + Send: sendTimeout, + Read: readTimeout, + } + return nil +} + func (t *translator) translateUpstreamScheme(scheme string, ups *apisixv1.Upstream) error { if scheme == "" { ups.Scheme = apisixv1.SchemeHTTP diff --git a/pkg/kube/apisix_upstream_test.go b/pkg/kube/apisix_upstream_test.go index db0344a0a4..9815996edc 100644 --- a/pkg/kube/apisix_upstream_test.go +++ b/pkg/kube/apisix_upstream_test.go @@ -369,3 +369,39 @@ func TestTranslateUpstreamActiveHealthCheckUnusually(t *testing.T) { reason: "invalid value", }) } + +func TestUpstreamRetriesAndTimeout(t *testing.T) { + tr := &translator{} + err := tr.translateUpstreamRetriesAndTimeout(-1, nil, nil) + assert.Equal(t, err, &translateError{ + field: "retries", + reason: "invalid value", + }) + + var ups apisixv1.Upstream + err = tr.translateUpstreamRetriesAndTimeout(3, nil, &ups) + assert.Nil(t, err) + assert.Equal(t, ups.Retries, 3) + + timeout := &configv1.UpstreamTimeout{ + Connect: metav1.Duration{Duration: time.Second}, + Read: metav1.Duration{Duration: -1}, + } + err = tr.translateUpstreamRetriesAndTimeout(3, timeout, &ups) + assert.Equal(t, err, &translateError{ + field: "timeout.read", + reason: "invalid value", + }) + + timeout = &configv1.UpstreamTimeout{ + Connect: metav1.Duration{Duration: time.Second}, + Read: metav1.Duration{Duration: 15 * time.Second}, + } + err = tr.translateUpstreamRetriesAndTimeout(3, timeout, &ups) + assert.Nil(t, err) + assert.Equal(t, ups.Timeout, &apisixv1.UpstreamTimeout{ + Connect: 1, + Send: 60, + Read: 15, + }) +} diff --git a/pkg/kube/translator.go b/pkg/kube/translator.go index 1e940b1d7e..a8e5941829 100644 --- a/pkg/kube/translator.go +++ b/pkg/kube/translator.go @@ -83,6 +83,9 @@ func (t *translator) TranslateUpstreamConfig(au *configv1.ApisixUpstreamConfig) if err := t.translateUpstreamHealthCheck(au.HealthCheck, ups); err != nil { return nil, err } + if err := t.translateUpstreamRetriesAndTimeout(au.Retries, au.Timeout, ups); err != nil { + return nil, err + } return ups, nil } diff --git a/pkg/types/apisix/v1/types.go b/pkg/types/apisix/v1/types.go index 1186fe564a..c41a18436e 100644 --- a/pkg/types/apisix/v1/types.go +++ b/pkg/types/apisix/v1/types.go @@ -62,6 +62,9 @@ const ( // ActiveHealthCheckMinInterval is the minimum interval for // the active health check. ActiveHealthCheckMinInterval = time.Second + + // Default connect, read and send timeout (in seconds) with upstreams. + DefaultUpstreamTimeout = 60 ) // Metadata contains all meta information about resources. @@ -130,6 +133,18 @@ type Upstream struct { Nodes []UpstreamNode `json:"nodes,omitempty" yaml:"nodes,omitempty"` FromKind string `json:"from_kind,omitempty" yaml:"from_kind,omitempty"` Scheme string `json:"scheme,omitempty" yaml:"scheme,omitempty"` + Retries int `json:"retries,omitempty" yaml:"retries,omitempty"` + Timeout *UpstreamTimeout `json:"timeout,omitempty" yaml:"timeout,omitempty"` +} + +// UpstreamTimeout represents the timeout settings on Upstream. +type UpstreamTimeout struct { + // Connect is the connect timeout + Connect int `json:"connect" yaml:"connect"` + // Send is the send timeout + Send int `json:"send" yaml:"send"` + // Read is the read timeout + Read int `json:"read" yaml:"read"` } // Node the node in upstream diff --git a/pkg/types/apisix/v1/zz_generated.deepcopy.go b/pkg/types/apisix/v1/zz_generated.deepcopy.go index 41b461c4b2..51869da95a 100644 --- a/pkg/types/apisix/v1/zz_generated.deepcopy.go +++ b/pkg/types/apisix/v1/zz_generated.deepcopy.go @@ -95,6 +95,11 @@ func (in *Upstream) DeepCopyInto(out *Upstream) { *out = make([]UpstreamNode, len(*in)) copy(*out, *in) } + if in.Timeout != nil { + in, out := &in.Timeout, &out.Timeout + *out = new(UpstreamTimeout) + **out = **in + } return } diff --git a/test/e2e/features/retries_timeout.go b/test/e2e/features/retries_timeout.go new file mode 100644 index 0000000000..9e3d757f5d --- /dev/null +++ b/test/e2e/features/retries_timeout.go @@ -0,0 +1,114 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package features + +import ( + "fmt" + "time" + + "github.com/apache/apisix-ingress-controller/test/e2e/scaffold" + "github.com/onsi/ginkgo" + "github.com/stretchr/testify/assert" +) + +var _ = ginkgo.Describe("retries", func() { + s := scaffold.NewDefaultScaffold() + ginkgo.It("active check", func() { + backendSvc, backendPorts := s.DefaultHTTPBackend() + + au := fmt.Sprintf(` +apiVersion: apisix.apache.org/v1 +kind: ApisixUpstream +metadata: + name: %s +spec: + retries: 3 +`, backendSvc) + err := s.CreateResourceFromString(au) + assert.Nil(ginkgo.GinkgoT(), err, "create ApisixUpstream") + time.Sleep(2 * time.Second) + + ar := fmt.Sprintf(` +apiVersion: apisix.apache.org/v1 +kind: ApisixRoute +metadata: + name: httpbin-route +spec: + rules: + - host: httpbin.org + http: + paths: + - backend: + serviceName: %s + servicePort: %d + path: /* +`, backendSvc, backendPorts[0]) + err = s.CreateResourceFromString(ar) + assert.Nil(ginkgo.GinkgoT(), err) + time.Sleep(5 * time.Second) + + ups, err := s.ListApisixUpstreams() + assert.Nil(ginkgo.GinkgoT(), err) + assert.Len(ginkgo.GinkgoT(), ups, 1) + assert.Equal(ginkgo.GinkgoT(), ups[0].Retries, 3) + }) +}) + +var _ = ginkgo.Describe("timeout", func() { + s := scaffold.NewDefaultScaffold() + ginkgo.It("active check", func() { + backendSvc, backendPorts := s.DefaultHTTPBackend() + + au := fmt.Sprintf(` +apiVersion: apisix.apache.org/v1 +kind: ApisixUpstream +metadata: + name: %s +spec: + timeout: + read: 10s + send: 10s +`, backendSvc) + err := s.CreateResourceFromString(au) + assert.Nil(ginkgo.GinkgoT(), err, "create ApisixUpstream") + time.Sleep(2 * time.Second) + + ar := fmt.Sprintf(` +apiVersion: apisix.apache.org/v1 +kind: ApisixRoute +metadata: + name: httpbin-route +spec: + rules: + - host: httpbin.org + http: + paths: + - backend: + serviceName: %s + servicePort: %d + path: /* +`, backendSvc, backendPorts[0]) + err = s.CreateResourceFromString(ar) + assert.Nil(ginkgo.GinkgoT(), err) + time.Sleep(5 * time.Second) + + ups, err := s.ListApisixUpstreams() + assert.Nil(ginkgo.GinkgoT(), err) + assert.Len(ginkgo.GinkgoT(), ups, 1) + assert.Equal(ginkgo.GinkgoT(), ups[0].Timeout.Connect, 60) + assert.Equal(ginkgo.GinkgoT(), ups[0].Timeout.Read, 10) + assert.Equal(ginkgo.GinkgoT(), ups[0].Timeout.Send, 10) + }) +})