Skip to content

Commit

Permalink
fix: check if stream_routes is disabled (#868)
Browse files Browse the repository at this point in the history
  • Loading branch information
lingsamuel authored and tao12345666333 committed Apr 22, 2022
1 parent 90dd10e commit e0518a4
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 6 deletions.
35 changes: 30 additions & 5 deletions pkg/apisix/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ var (
// ErrDuplicatedCluster means the cluster adding request was
// rejected since the cluster was already created.
ErrDuplicatedCluster = errors.New("duplicated cluster")
// ErrFunctionDisabled means the APISIX function is disabled
ErrFunctionDisabled = errors.New("function disabled")

_errReadOnClosedResBody = errors.New("http: read on closed response body")

Expand Down Expand Up @@ -517,6 +519,10 @@ func (c *cluster) do(req *http.Request) (*http.Response, error) {
return c.cli.Do(req)
}

func (c *cluster) isFunctionDisabled(body string) bool {
return strings.Contains(body, "is disabled")
}

func (c *cluster) getResource(ctx context.Context, url, resource string) (*getResponse, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
Expand All @@ -532,11 +538,15 @@ func (c *cluster) getResource(ctx context.Context, url, resource string) (*getRe

defer drainBody(resp.Body, url)
if resp.StatusCode != http.StatusOK {
body := readBody(resp.Body, url)
if c.isFunctionDisabled(body) {
return nil, ErrFunctionDisabled
}
if resp.StatusCode == http.StatusNotFound {
return nil, cache.ErrNotFound
} else {
err = multierr.Append(err, fmt.Errorf("unexpected status code %d", resp.StatusCode))
err = multierr.Append(err, fmt.Errorf("error message: %s", readBody(resp.Body, url)))
err = multierr.Append(err, fmt.Errorf("error message: %s", body))
}
return nil, err
}
Expand Down Expand Up @@ -565,8 +575,12 @@ func (c *cluster) listResource(ctx context.Context, url, resource string) (*list

defer drainBody(resp.Body, url)
if resp.StatusCode != http.StatusOK {
body := readBody(resp.Body, url)
if c.isFunctionDisabled(body) {
return nil, ErrFunctionDisabled
}
err = multierr.Append(err, fmt.Errorf("unexpected status code %d", resp.StatusCode))
err = multierr.Append(err, fmt.Errorf("error message: %s", readBody(resp.Body, url)))
err = multierr.Append(err, fmt.Errorf("error message: %s", body))
return nil, err
}

Expand Down Expand Up @@ -595,8 +609,12 @@ func (c *cluster) createResource(ctx context.Context, url, resource string, body
defer drainBody(resp.Body, url)

if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusOK {
body := readBody(resp.Body, url)
if c.isFunctionDisabled(body) {
return nil, ErrFunctionDisabled
}
err = multierr.Append(err, fmt.Errorf("unexpected status code %d", resp.StatusCode))
err = multierr.Append(err, fmt.Errorf("error message: %s", readBody(resp.Body, url)))
err = multierr.Append(err, fmt.Errorf("error message: %s", body))
return nil, err
}

Expand Down Expand Up @@ -624,8 +642,12 @@ func (c *cluster) updateResource(ctx context.Context, url, resource string, body
defer drainBody(resp.Body, url)

if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
body := readBody(resp.Body, url)
if c.isFunctionDisabled(body) {
return nil, ErrFunctionDisabled
}
err = multierr.Append(err, fmt.Errorf("unexpected status code %d", resp.StatusCode))
err = multierr.Append(err, fmt.Errorf("error message: %s", readBody(resp.Body, url)))
err = multierr.Append(err, fmt.Errorf("error message: %s", body))
return nil, err
}
var ur updateResponse
Expand All @@ -652,8 +674,11 @@ func (c *cluster) deleteResource(ctx context.Context, url, resource string) erro
defer drainBody(resp.Body, url)

if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusNotFound {
err = multierr.Append(err, fmt.Errorf("unexpected status code %d", resp.StatusCode))
message := readBody(resp.Body, url)
if c.isFunctionDisabled(message) {
return ErrFunctionDisabled
}
err = multierr.Append(err, fmt.Errorf("unexpected status code %d", resp.StatusCode))
err = multierr.Append(err, fmt.Errorf("error message: %s", message))
if strings.Contains(message, "still using") {
return cache.ErrStillInUse
Expand Down
51 changes: 51 additions & 0 deletions pkg/apisix/noop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
package apisix

import (
"context"

v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)

var (
_ StreamRoute = (*noopClient)(nil)
)

type noopClient struct {
}

func (r *noopClient) Get(ctx context.Context, name string) (*v1.StreamRoute, error) {
return nil, nil
}

func (r *noopClient) List(ctx context.Context) ([]*v1.StreamRoute, error) {
return nil, nil
}

func (r *noopClient) Create(ctx context.Context, obj *v1.StreamRoute) (*v1.StreamRoute, error) {
return nil, nil
}

func (r *noopClient) Delete(ctx context.Context, obj *v1.StreamRoute) error {
return nil
}

func (r *noopClient) Update(ctx context.Context, obj *v1.StreamRoute) (*v1.StreamRoute, error) {
return nil, nil
}
8 changes: 7 additions & 1 deletion pkg/apisix/stream_route.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,14 @@ type streamRouteClient struct {
}

func newStreamRouteClient(c *cluster) StreamRoute {
url := c.baseURL + "/stream_routes"
_, err := c.listResource(context.Background(), url, "streamRoute")
if err == ErrFunctionDisabled {
log.Infow("resource stream_routes is disabled")
return &noopClient{}
}
return &streamRouteClient{
url: c.baseURL + "/stream_routes",
url: url,
cluster: c,
}
}
Expand Down
46 changes: 46 additions & 0 deletions test/e2e/ingress/sanity.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,49 @@ var _ = ginkgo.Describe("leader election", func() {
}
})
})

var _ = ginkgo.Describe("stream_routes disabled", func() {
opts := &scaffold.Options{
Name: "default",
Kubeconfig: scaffold.GetKubeconfig(),
APISIXConfigPath: "testdata/apisix-stream-disabled.yaml",
IngressAPISIXReplicas: 1,
HTTPBinServicePort: 80,
APISIXRouteVersion: "apisix.apache.org/v2beta3",
}
s := scaffold.NewScaffold(opts)
ginkgo.It("/ip should return your ip", func() {
backendSvc, backendSvcPort := s.DefaultHTTPBackend()
ar := fmt.Sprintf(`
apiVersion: apisix.apache.org/v2beta3
kind: ApisixRoute
metadata:
name: httpbin-route
spec:
http:
- name: rule1
match:
paths:
- /ip
backends:
- serviceName: %s
servicePort: %d
`, backendSvc, backendSvcPort[0])
assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(ar))
err := s.EnsureNumApisixRoutesCreated(1)
assert.Nil(ginkgo.GinkgoT(), err, "checking number of routes")
err = s.EnsureNumApisixUpstreamsCreated(1)
assert.Nil(ginkgo.GinkgoT(), err, "checking number of upstreams")

// TODO When ingress controller can feedback the lifecycle of CRDs to the
// status field, we can poll it rather than sleeping.
time.Sleep(3 * time.Second)

body := s.NewAPISIXClient().GET("/ip").WithHeader("Host", "httpbin.com").Expect().Status(http.StatusOK).Body().Raw()
var placeholder ip
err = json.Unmarshal([]byte(body), &placeholder)
assert.Nil(ginkgo.GinkgoT(), err, "unmarshalling IP")
// It's not our focus point to check the IP address returned by httpbin,
// so here skip the IP address validation.
})
})
42 changes: 42 additions & 0 deletions test/e2e/testdata/apisix-stream-disabled.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# PLEASE DO NOT UPDATE THIS FILE!
# If you want to set the specified configuration value, you can set the new
# value in the conf/config.yaml file.
#

apisix:
enable_control: true
enable_reuseport: true # Enable nginx SO_REUSEPORT switch if set to true.
allow_admin:
- 127.0.0.0/24
- 0.0.0.0/0
port_admin: 9180
# stream_proxy: # TCP/UDP proxy
# only: false
# tcp: # TCP proxy port list
# - 9100
# udp:
# - 9200
etcd:
host: # it's possible to define multiple etcd hosts addresses of the same etcd cluster.
- "http://{{ .EtcdServiceFQDN }}:2379" # multiple etcd address
prefix: "/apisix" # apisix configurations prefix
timeout: 30 # 30 seconds
plugin_attr:
prometheus:
enable_export_server: false

0 comments on commit e0518a4

Please sign in to comment.