Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add full compare when ingress startup #680

Merged
merged 15 commits into from
Sep 24, 2021
1 change: 1 addition & 0 deletions pkg/apisix/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package apisix

import (
"context"

"go.uber.org/zap"

"github.com/apache/apisix-ingress-controller/pkg/apisix/cache"
Expand Down
271 changes: 271 additions & 0 deletions pkg/ingress/compare.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ingress

import (
"context"
"sync"
"time"

"C"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

apisix "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)

// CompareResources use to compare the object IDs in resources and APISIX
// Find out the rest of objects in APISIX
// AND remove them.
func (c *Controller) CompareResources() {
var (
routeMapK8S = new(sync.Map)
streamRouteMapK8S = new(sync.Map)
upstreamMapK8S = new(sync.Map)
sslMapK8S = new(sync.Map)
consumerMapK8S = new(sync.Map)

routeMapA6 = new(sync.Map)
streamRouteMapA6 = new(sync.Map)
upstreamMapA6 = new(sync.Map)
sslMapA6 = new(sync.Map)
consumerMapA6 = new(sync.Map)
)
// todo if watchingNamespace == nil
if c.watchingNamespace == nil {
opts := v1.ListOptions{}
// list all apisixroute resources in all namespaces
nsList, err := c.kubeClient.Client.CoreV1().Namespaces().List(context.TODO(), opts)
if err != nil {
panic(err)
} else {
wns := make(map[string]struct{}, len(nsList.Items))
for _, v := range nsList.Items {
wns[v.Name] = struct{}{}
}
c.watchingNamespace = wns
}
}
for ns := range c.watchingNamespace {
// ApisixRoute
opts := v1.ListOptions{}
retRoutes, err := c.kubeClient.APISIXClient.ApisixV2beta1().ApisixRoutes(ns).List(context.TODO(), opts)
if err != nil {
panic(err)
} else {
for _, r := range retRoutes.Items {
tc, err := c.translator.TranslateRouteV2beta1NotStrictly(&r)
if err != nil {
panic(err)
} else {
// routes
for _, route := range tc.Routes {
routeMapK8S.Store(route.ID, route.ID)
}
// streamRoutes
for _, stRoute := range tc.StreamRoutes {
streamRouteMapK8S.Store(stRoute.ID, stRoute.ID)
}
// upstreams
for _, upstream := range tc.Upstreams {
upstreamMapK8S.Store(upstream.ID, upstream.ID)
}
// ssl
for _, ssl := range tc.SSL {
sslMapK8S.Store(ssl.ID, ssl.ID)
}
}
}
}
// todo ApisixUpstream
// ApisixUpstream should be synced with ApisixRoute resource

// ApisixSSL
retSSL, err := c.kubeClient.APISIXClient.ApisixV1().ApisixTlses(ns).List(context.TODO(), opts)
if err != nil {
panic(err)
} else {
for _, s := range retSSL.Items {
ssl, err := c.translator.TranslateSSL(&s)
if err != nil {
panic(err)
} else {
sslMapK8S.Store(ssl.ID, ssl.ID)
}
}
}
// ApisixConsumer
retConsumer, err := c.kubeClient.APISIXClient.ApisixV2alpha1().ApisixConsumers(ns).List(context.TODO(), opts)
if err != nil {
panic(err)
} else {
for _, con := range retConsumer.Items {
consumer, err := c.translator.TranslateApisixConsumer(&con)
if err != nil {
panic(err)
} else {
consumerMapK8S.Store(consumer.Username, consumer.Username)
}
}
}
}

// 2.get all cache routes
c.listRouteCache(routeMapA6)
c.listStreamRouteCache(streamRouteMapA6)
c.listUpstreamCache(upstreamMapA6)
c.listSSLCache(sslMapA6)
c.listConsumerCache(consumerMapA6)
// 3.compare
routeReult := findRedundant(routeMapA6, routeMapK8S)
streamRouteReult := findRedundant(streamRouteMapA6, streamRouteMapK8S)
upstreamReult := findRedundant(upstreamMapA6, upstreamMapK8S)
sslReult := findRedundant(sslMapA6, sslMapK8S)
consuemrReult := findRedundant(consumerMapA6, consumerMapK8S)
// 4.remove from APISIX
c.removeRouteFromA6(routeReult)
c.removeStreamRouteFromA6(streamRouteReult)
c.removeSSLFromA6(sslReult)
c.removeConsumerFromA6(consuemrReult)
time.Sleep(5 * time.Second)
c.removeUpstreamFromA6(upstreamReult)
}

// findRedundant find redundant item which in src and do not in dest
func findRedundant(src, dest *sync.Map) *sync.Map {
result := new(sync.Map)
src.Range(func(k, v interface{}) bool {
_, ok := dest.Load(k)
if !ok {
result.Store(k, v)
}
return true
})
return result
}

func (c *Controller) removeConsumerFromA6(consumers *sync.Map) {
consumers.Range(func(k, v interface{}) bool {
r := &apisix.Consumer{}
r.Username = k.(string)
err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).Consumer().Delete(context.TODO(), r)
if err != nil {
panic(err)
}
return true
})
}

func (c *Controller) removeSSLFromA6(sslReult *sync.Map) {
sslReult.Range(func(k, v interface{}) bool {
r := &apisix.Ssl{}
r.ID = k.(string)
err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).SSL().Delete(context.TODO(), r)
if err != nil {
panic(err)
}
return true
})
}

func (c *Controller) removeUpstreamFromA6(upstreamReult *sync.Map) {
upstreamReult.Range(func(k, v interface{}) bool {
r := &apisix.Upstream{}
r.ID = k.(string)
err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).Upstream().Delete(context.TODO(), r)
if err != nil {
panic(err)
}
return true
})
}

func (c *Controller) removeStreamRouteFromA6(streamRouteReult *sync.Map) {
streamRouteReult.Range(func(k, v interface{}) bool {
r := &apisix.StreamRoute{}
r.ID = k.(string)
err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).StreamRoute().Delete(context.TODO(), r)
if err != nil {
panic(err)
}
return true
})
}

func (c *Controller) removeRouteFromA6(routeReult *sync.Map) {
routeReult.Range(func(k, v interface{}) bool {
r := &apisix.Route{}
r.ID = k.(string)
err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).Route().Delete(context.TODO(), r)
if err != nil {
panic(err)
}
return true
})
}

func (c *Controller) listRouteCache(routeMapA6 *sync.Map) {
routesInA6, err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).Route().List(context.TODO())
if err != nil {
panic(err)
} else {
for _, ra := range routesInA6 {
routeMapA6.Store(ra.ID, ra.ID)
}
}
}

func (c *Controller) listStreamRouteCache(streamRouteMapA6 *sync.Map) {
streamRoutesInA6, err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).StreamRoute().List(context.TODO())
if err != nil {
panic(err)
} else {
for _, ra := range streamRoutesInA6 {
streamRouteMapA6.Store(ra.ID, ra.ID)
}
}
}

func (c *Controller) listUpstreamCache(upstreamMapA6 *sync.Map) {
upstreamsInA6, err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).Upstream().List(context.TODO())
if err != nil {
panic(err)
} else {
for _, ra := range upstreamsInA6 {
upstreamMapA6.Store(ra.ID, ra.ID)
}
}
}

func (c *Controller) listSSLCache(sslMapA6 *sync.Map) {
sslInA6, err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).SSL().List(context.TODO())
if err != nil {
panic(err)
} else {
for _, s := range sslInA6 {
sslMapA6.Store(s.ID, s.ID)
}
}
}

func (c *Controller) listConsumerCache(consumerMapA6 *sync.Map) {
consumerInA6, err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).Consumer().List(context.TODO())
if err != nil {
panic(err)
} else {
for _, con := range consumerInA6 {
consumerMapA6.Store(con.Username, con.Username)
}
}
}
4 changes: 4 additions & 0 deletions pkg/ingress/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,9 @@ func (c *Controller) run(ctx context.Context) {

c.initWhenStartLeading()

// compare resources of k8s with objects of APISIX
c.CompareResources()

c.goAttach(func() {
c.checkClusterHealth(ctx, cancelFunc)
})
Expand All @@ -416,6 +419,7 @@ func (c *Controller) run(ctx context.Context) {
c.ingressInformer.Run(ctx.Done())
})
c.goAttach(func() {

c.apisixRouteInformer.Run(ctx.Done())
})
c.goAttach(func() {
Expand Down
3 changes: 2 additions & 1 deletion pkg/ingress/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ func (c *podController) onUpdate(_, cur interface{}) {
return
}
log.Debugw("pod update event arrived",
zap.Any("final state", pod),
zap.Any("pod namespace", pod.Namespace),
zap.Any("pod name", pod.Name),
)
if pod.DeletionTimestamp != nil {
if err := c.controller.podCache.Delete(pod); err != nil {
Expand Down
1 change: 0 additions & 1 deletion test/e2e/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ require (
github.com/gorilla/websocket v1.4.2
github.com/gruntwork-io/terratest v0.32.8
github.com/onsi/ginkgo v1.16.4
github.com/onsi/gomega v1.10.1
github.com/stretchr/testify v1.7.0
k8s.io/api v0.21.1
k8s.io/apimachinery v0.21.1
Expand Down
76 changes: 76 additions & 0 deletions test/e2e/ingress/compare.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ingress

import (
"fmt"
"time"

"github.com/onsi/ginkgo"
"github.com/stretchr/testify/assert"

"github.com/apache/apisix-ingress-controller/test/e2e/scaffold"
)

var _ = ginkgo.Describe("Testing compare resources", func() {
opts := &scaffold.Options{
Name: "default",
Kubeconfig: scaffold.GetKubeconfig(),
APISIXConfigPath: "testdata/apisix-gw-config.yaml",
IngressAPISIXReplicas: 1,
HTTPBinServicePort: 80,
APISIXRouteVersion: "apisix.apache.org/v2beta1",
}
s := scaffold.NewScaffold(opts)
ginkgo.It("Compare and find out the redundant objects in APISIX, and remove them", func() {
backendSvc, backendSvcPort := s.DefaultHTTPBackend()
apisixRoute := fmt.Sprintf(`
apiVersion: apisix.apache.org/v2beta1
kind: ApisixRoute
metadata:
name: httpbin-route
spec:
http:
- name: rule1
match:
hosts:
- httpbin.com
paths:
- /ip
backend:
serviceName: %s
servicePort: %d
`, backendSvc, backendSvcPort[0])
assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(apisixRoute))

err := s.EnsureNumApisixRoutesCreated(1)
assert.Nil(ginkgo.GinkgoT(), err, "Checking number of routes")
err = s.EnsureNumApisixUpstreamsCreated(1)
assert.Nil(ginkgo.GinkgoT(), err, "Checking number of upstreams")
// scale Ingres Controller --replicas=0
assert.Nil(ginkgo.GinkgoT(), s.ScaleIngressController(0), "scaling ingress controller instances = 0")
// remove ApisixRoute resource
assert.Nil(ginkgo.GinkgoT(), s.RemoveResourceByString(apisixRoute))
// scale Ingres Controller --replicas=1
assert.Nil(ginkgo.GinkgoT(), s.ScaleIngressController(1), "scaling ingress controller instances = 1")
time.Sleep(15 * time.Second)
routes, err := s.ListApisixRoutes()
assert.Nil(ginkgo.GinkgoT(), err, "list routes error")
assert.Len(ginkgo.GinkgoT(), routes, 0, "route should be removed")
ups, err := s.ListApisixUpstreams()
assert.Nil(ginkgo.GinkgoT(), err, "list upstreams error")
assert.Len(ginkgo.GinkgoT(), ups, 0, "upstream should be removed")
})
})
Loading