From e0518a4b354c80aaa7b15ab4f6de4f0cf3a13e52 Mon Sep 17 00:00:00 2001 From: Sarasa Kisaragi Date: Wed, 2 Mar 2022 19:51:19 +0800 Subject: [PATCH] fix: check if stream_routes is disabled (#868) --- pkg/apisix/cluster.go | 35 +++++++++++-- pkg/apisix/noop.go | 51 +++++++++++++++++++ pkg/apisix/stream_route.go | 8 ++- test/e2e/ingress/sanity.go | 46 +++++++++++++++++ test/e2e/testdata/apisix-stream-disabled.yaml | 42 +++++++++++++++ 5 files changed, 176 insertions(+), 6 deletions(-) create mode 100644 pkg/apisix/noop.go create mode 100644 test/e2e/testdata/apisix-stream-disabled.yaml diff --git a/pkg/apisix/cluster.go b/pkg/apisix/cluster.go index 8b7a938834..20565bb396 100644 --- a/pkg/apisix/cluster.go +++ b/pkg/apisix/cluster.go @@ -53,6 +53,8 @@ var ( // ErrDuplicatedCluster means the cluster adding request was // rejected since the cluster was already created. ErrDuplicatedCluster = errors.New("duplicated cluster") + // ErrFunctionDisabled means the APISIX function is disabled + ErrFunctionDisabled = errors.New("function disabled") _errReadOnClosedResBody = errors.New("http: read on closed response body") @@ -517,6 +519,10 @@ func (c *cluster) do(req *http.Request) (*http.Response, error) { return c.cli.Do(req) } +func (c *cluster) isFunctionDisabled(body string) bool { + return strings.Contains(body, "is disabled") +} + func (c *cluster) getResource(ctx context.Context, url, resource string) (*getResponse, error) { req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { @@ -532,11 +538,15 @@ func (c *cluster) getResource(ctx context.Context, url, resource string) (*getRe defer drainBody(resp.Body, url) if resp.StatusCode != http.StatusOK { + body := readBody(resp.Body, url) + if c.isFunctionDisabled(body) { + return nil, ErrFunctionDisabled + } if resp.StatusCode == http.StatusNotFound { return nil, cache.ErrNotFound } else { err = multierr.Append(err, fmt.Errorf("unexpected status code %d", resp.StatusCode)) - err = multierr.Append(err, fmt.Errorf("error message: %s", readBody(resp.Body, url))) + err = multierr.Append(err, fmt.Errorf("error message: %s", body)) } return nil, err } @@ -565,8 +575,12 @@ func (c *cluster) listResource(ctx context.Context, url, resource string) (*list defer drainBody(resp.Body, url) if resp.StatusCode != http.StatusOK { + body := readBody(resp.Body, url) + if c.isFunctionDisabled(body) { + return nil, ErrFunctionDisabled + } err = multierr.Append(err, fmt.Errorf("unexpected status code %d", resp.StatusCode)) - err = multierr.Append(err, fmt.Errorf("error message: %s", readBody(resp.Body, url))) + err = multierr.Append(err, fmt.Errorf("error message: %s", body)) return nil, err } @@ -595,8 +609,12 @@ func (c *cluster) createResource(ctx context.Context, url, resource string, body defer drainBody(resp.Body, url) if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusOK { + body := readBody(resp.Body, url) + if c.isFunctionDisabled(body) { + return nil, ErrFunctionDisabled + } err = multierr.Append(err, fmt.Errorf("unexpected status code %d", resp.StatusCode)) - err = multierr.Append(err, fmt.Errorf("error message: %s", readBody(resp.Body, url))) + err = multierr.Append(err, fmt.Errorf("error message: %s", body)) return nil, err } @@ -624,8 +642,12 @@ func (c *cluster) updateResource(ctx context.Context, url, resource string, body defer drainBody(resp.Body, url) if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { + body := readBody(resp.Body, url) + if c.isFunctionDisabled(body) { + return nil, ErrFunctionDisabled + } err = multierr.Append(err, fmt.Errorf("unexpected status code %d", resp.StatusCode)) - err = multierr.Append(err, fmt.Errorf("error message: %s", readBody(resp.Body, url))) + err = multierr.Append(err, fmt.Errorf("error message: %s", body)) return nil, err } var ur updateResponse @@ -652,8 +674,11 @@ func (c *cluster) deleteResource(ctx context.Context, url, resource string) erro defer drainBody(resp.Body, url) if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusNotFound { - err = multierr.Append(err, fmt.Errorf("unexpected status code %d", resp.StatusCode)) message := readBody(resp.Body, url) + if c.isFunctionDisabled(message) { + return ErrFunctionDisabled + } + err = multierr.Append(err, fmt.Errorf("unexpected status code %d", resp.StatusCode)) err = multierr.Append(err, fmt.Errorf("error message: %s", message)) if strings.Contains(message, "still using") { return cache.ErrStillInUse diff --git a/pkg/apisix/noop.go b/pkg/apisix/noop.go new file mode 100644 index 0000000000..af7dd87dee --- /dev/null +++ b/pkg/apisix/noop.go @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +package apisix + +import ( + "context" + + v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" +) + +var ( + _ StreamRoute = (*noopClient)(nil) +) + +type noopClient struct { +} + +func (r *noopClient) Get(ctx context.Context, name string) (*v1.StreamRoute, error) { + return nil, nil +} + +func (r *noopClient) List(ctx context.Context) ([]*v1.StreamRoute, error) { + return nil, nil +} + +func (r *noopClient) Create(ctx context.Context, obj *v1.StreamRoute) (*v1.StreamRoute, error) { + return nil, nil +} + +func (r *noopClient) Delete(ctx context.Context, obj *v1.StreamRoute) error { + return nil +} + +func (r *noopClient) Update(ctx context.Context, obj *v1.StreamRoute) (*v1.StreamRoute, error) { + return nil, nil +} diff --git a/pkg/apisix/stream_route.go b/pkg/apisix/stream_route.go index e835eabab7..f6377c8d78 100644 --- a/pkg/apisix/stream_route.go +++ b/pkg/apisix/stream_route.go @@ -33,8 +33,14 @@ type streamRouteClient struct { } func newStreamRouteClient(c *cluster) StreamRoute { + url := c.baseURL + "/stream_routes" + _, err := c.listResource(context.Background(), url, "streamRoute") + if err == ErrFunctionDisabled { + log.Infow("resource stream_routes is disabled") + return &noopClient{} + } return &streamRouteClient{ - url: c.baseURL + "/stream_routes", + url: url, cluster: c, } } diff --git a/test/e2e/ingress/sanity.go b/test/e2e/ingress/sanity.go index ff3dd45927..08247f752d 100644 --- a/test/e2e/ingress/sanity.go +++ b/test/e2e/ingress/sanity.go @@ -186,3 +186,49 @@ var _ = ginkgo.Describe("leader election", func() { } }) }) + +var _ = ginkgo.Describe("stream_routes disabled", func() { + opts := &scaffold.Options{ + Name: "default", + Kubeconfig: scaffold.GetKubeconfig(), + APISIXConfigPath: "testdata/apisix-stream-disabled.yaml", + IngressAPISIXReplicas: 1, + HTTPBinServicePort: 80, + APISIXRouteVersion: "apisix.apache.org/v2beta3", + } + s := scaffold.NewScaffold(opts) + ginkgo.It("/ip should return your ip", func() { + backendSvc, backendSvcPort := s.DefaultHTTPBackend() + ar := fmt.Sprintf(` +apiVersion: apisix.apache.org/v2beta3 +kind: ApisixRoute +metadata: + name: httpbin-route +spec: + http: + - name: rule1 + match: + paths: + - /ip + backends: + - serviceName: %s + servicePort: %d +`, backendSvc, backendSvcPort[0]) + assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(ar)) + err := s.EnsureNumApisixRoutesCreated(1) + assert.Nil(ginkgo.GinkgoT(), err, "checking number of routes") + err = s.EnsureNumApisixUpstreamsCreated(1) + assert.Nil(ginkgo.GinkgoT(), err, "checking number of upstreams") + + // TODO When ingress controller can feedback the lifecycle of CRDs to the + // status field, we can poll it rather than sleeping. + time.Sleep(3 * time.Second) + + body := s.NewAPISIXClient().GET("/ip").WithHeader("Host", "httpbin.com").Expect().Status(http.StatusOK).Body().Raw() + var placeholder ip + err = json.Unmarshal([]byte(body), &placeholder) + assert.Nil(ginkgo.GinkgoT(), err, "unmarshalling IP") + // It's not our focus point to check the IP address returned by httpbin, + // so here skip the IP address validation. + }) +}) diff --git a/test/e2e/testdata/apisix-stream-disabled.yaml b/test/e2e/testdata/apisix-stream-disabled.yaml new file mode 100644 index 0000000000..458b707a25 --- /dev/null +++ b/test/e2e/testdata/apisix-stream-disabled.yaml @@ -0,0 +1,42 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# PLEASE DO NOT UPDATE THIS FILE! +# If you want to set the specified configuration value, you can set the new +# value in the conf/config.yaml file. +# + +apisix: + enable_control: true + enable_reuseport: true # Enable nginx SO_REUSEPORT switch if set to true. + allow_admin: + - 127.0.0.0/24 + - 0.0.0.0/0 + port_admin: 9180 +# stream_proxy: # TCP/UDP proxy +# only: false +# tcp: # TCP proxy port list +# - 9100 +# udp: +# - 9200 +etcd: + host: # it's possible to define multiple etcd hosts addresses of the same etcd cluster. + - "http://{{ .EtcdServiceFQDN }}:2379" # multiple etcd address + prefix: "/apisix" # apisix configurations prefix + timeout: 30 # 30 seconds +plugin_attr: + prometheus: + enable_export_server: false