From 5c798213da804493d3664ae4bc39dfceb9686f0d Mon Sep 17 00:00:00 2001 From: Sarasa Kisaragi Date: Thu, 20 Oct 2022 17:15:41 +0800 Subject: [PATCH] feat: support external service (#1306) --- pkg/kube/apisix/apis/config/v2/types.go | 75 ++- .../apis/config/v2/zz_generated.deepcopy.go | 72 +++ pkg/kube/apisix_upstream.go | 12 +- pkg/providers/apisix/apisix_route.go | 268 ++++++-- pkg/providers/apisix/apisix_upstream.go | 326 +++++++++- pkg/providers/apisix/provider.go | 22 +- .../apisix/translation/apisix_plugin.go | 2 +- .../apisix/translation/apisix_route.go | 215 +++++-- .../apisix/translation/apisix_route_test.go | 245 ++++++++ .../apisix/translation/apisix_upstream.go | 108 ++++ .../apisix/translation/translator.go | 8 +- pkg/providers/utils/string.go | 8 + pkg/types/apisix/v1/types.go | 5 + samples/deploy/crd/v1/ApisixRoute.yaml | 15 +- samples/deploy/crd/v1/ApisixUpstream.yaml | 13 + test/e2e/go.mod | 2 +- test/e2e/scaffold/k8s.go | 13 +- test/e2e/scaffold/scaffold.go | 14 + test/e2e/suite-features/external-service.go | 582 ++++++++++++++++++ 19 files changed, 1880 insertions(+), 125 deletions(-) create mode 100644 test/e2e/suite-features/external-service.go diff --git a/pkg/kube/apisix/apis/config/v2/types.go b/pkg/kube/apisix/apis/config/v2/types.go index 0225c0f2f6..f01f57e711 100644 --- a/pkg/kube/apisix/apis/config/v2/types.go +++ b/pkg/kube/apisix/apis/config/v2/types.go @@ -67,14 +67,17 @@ type ApisixRouteHTTP struct { // Backends represents potential backends to proxy after the route // rule matched. When number of backends are more than one, traffic-split // plugin in APISIX will be used to split traffic based on the backend weight. - Backends []ApisixRouteHTTPBackend `json:"backends,omitempty" yaml:"backends,omitempty"` + Backends []ApisixRouteHTTPBackend `json:"backends,omitempty" yaml:"backends,omitempty"` + // Upstreams refer to ApisixUpstream CRD + Upstreams []ApisixRouteUpstreamReference `json:"upstreams,omitempty" yaml:"upstreams,omitempty"` + Websocket bool `json:"websocket" yaml:"websocket"` PluginConfigName string `json:"plugin_config_name,omitempty" yaml:"plugin_config_name,omitempty"` Plugins []ApisixRoutePlugin `json:"plugins,omitempty" yaml:"plugins,omitempty"` Authentication ApisixRouteAuthentication `json:"authentication,omitempty" yaml:"authentication,omitempty"` } -// ApisixRouteHTTPBackend represents a HTTP backend (a Kuberentes Service). +// ApisixRouteHTTPBackend represents an HTTP backend (a Kubernetes Service). type ApisixRouteHTTPBackend struct { // The name (short) of the service, note cross namespace is forbidden, // so be sure the ApisixRoute and Service are in the same namespace. @@ -93,6 +96,13 @@ type ApisixRouteHTTPBackend struct { Subset string `json:"subset,omitempty" yaml:"subset,omitempty"` } +// ApisixRouteUpstreamReference contains a ApisixUpstream CRD reference +type ApisixRouteUpstreamReference struct { + Name string `json:"name,omitempty" yaml:"name"` + // +optional + Weight *int `json:"weight,omitempty" yaml:"weight"` +} + // ApisixRouteHTTPMatch represents the match condition for hitting this route. type ApisixRouteHTTPMatch struct { // URI path predicates, at least one path should be @@ -166,6 +176,20 @@ type ApisixRoutePlugin struct { // any plugins. type ApisixRoutePluginConfig map[string]interface{} +func (p ApisixRoutePluginConfig) DeepCopyInto(out *ApisixRoutePluginConfig) { + b, _ := json.Marshal(&p) + _ = json.Unmarshal(b, out) +} + +func (p *ApisixRoutePluginConfig) DeepCopy() *ApisixRoutePluginConfig { + if p == nil { + return nil + } + out := new(ApisixRoutePluginConfig) + p.DeepCopyInto(out) + return out +} + // ApisixRouteAuthentication is the authentication-related // configuration in ApisixRoute. type ApisixRouteAuthentication struct { @@ -189,20 +213,6 @@ type ApisixRouteAuthenticationJwtAuth struct { Cookie string `json:"cookie,omitempty" yaml:"cookie,omitempty"` } -func (p ApisixRoutePluginConfig) DeepCopyInto(out *ApisixRoutePluginConfig) { - b, _ := json.Marshal(&p) - _ = json.Unmarshal(b, out) -} - -func (p *ApisixRoutePluginConfig) DeepCopy() *ApisixRoutePluginConfig { - if p == nil { - return nil - } - out := new(ApisixRoutePluginConfig) - p.DeepCopyInto(out) - return out -} - // ApisixRouteStream is the configuration for level 4 route type ApisixRouteStream struct { // The rule name, cannot be empty. @@ -238,7 +248,6 @@ type ApisixRouteStreamBackend struct { } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object - // ApisixRouteList contains a list of ApisixRoute. type ApisixRouteList struct { metav1.TypeMeta `json:",inline" yaml:",inline"` @@ -250,7 +259,6 @@ type ApisixRouteList struct { // +genclient:nonNamespaced // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // +kubebuilder:subresource:status - // ApisixClusterConfig is the Schema for the ApisixClusterConfig resource. // An ApisixClusterConfig is used to identify an APISIX cluster, it's a // ClusterScoped resource so the name is unique. @@ -310,7 +318,6 @@ type ApisixClusterAdminConfig struct { } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object - // ApisixClusterConfigList contains a list of ApisixClusterConfig. type ApisixClusterConfigList struct { metav1.TypeMeta `json:",inline" yaml:",inline"` @@ -322,7 +329,6 @@ type ApisixClusterConfigList struct { // +genclient // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // +kubebuilder:subresource:status - // ApisixConsumer is the Schema for the ApisixConsumer resource. // An ApisixConsumer is used to identify a consumer. type ApisixConsumer struct { @@ -418,7 +424,6 @@ type ApisixConsumerHMACAuthValue struct { } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object - // ApisixConsumerList contains a list of ApisixConsumer. type ApisixConsumerList struct { metav1.TypeMeta `json:",inline" yaml:",inline"` @@ -443,13 +448,18 @@ type ApisixUpstream struct { // ApisixUpstreamSpec describes the specification of ApisixUpstream. type ApisixUpstreamSpec struct { + // ExternalNodes contains external nodes the Upstream should use + // If this field is set, the upstream will use these nodes directly without any further resolves + // +optional + ExternalNodes []ApisixUpstreamExternalNode `json:"externalNodes,omitempty" yaml:"externalNodes,omitempty"` + ApisixUpstreamConfig `json:",inline" yaml:",inline"` PortLevelSettings []PortLevelSettings `json:"portLevelSettings,omitempty" yaml:"portLevelSettings,omitempty"` } // ApisixUpstreamConfig contains rich features on APISIX Upstream, for instance -// load balancer, health check and etc. +// load balancer, health check, etc. type ApisixUpstreamConfig struct { // LoadBalancer represents the load balancer configuration for Kubernetes Service. // The default strategy is round robin. @@ -483,6 +493,27 @@ type ApisixUpstreamConfig struct { Subsets []ApisixUpstreamSubset `json:"subsets,omitempty" yaml:"subsets,omitempty"` } +// ApisixUpstreamExternalType is the external service type +type ApisixUpstreamExternalType string + +const ( + // ExternalTypeDomain type is a domain + // +k8s:deepcopy-gen=false + ExternalTypeDomain ApisixUpstreamExternalType = "Domain" + + // ExternalTypeService type is a K8s ExternalName service + // +k8s:deepcopy-gen=false + ExternalTypeService ApisixUpstreamExternalType = "Service" +) + +// ApisixUpstreamExternalNode is the external node conf +type ApisixUpstreamExternalNode struct { + Name string `json:"name,omitempty" yaml:"name"` + Type ApisixUpstreamExternalType `json:"type,omitempty" yaml:"type"` + // +optional + Weight *int `json:"weight,omitempty" yaml:"weight"` +} + // ApisixUpstreamSubset defines a single endpoints group of one Service. type ApisixUpstreamSubset struct { // Name is the name of subset. diff --git a/pkg/kube/apisix/apis/config/v2/zz_generated.deepcopy.go b/pkg/kube/apisix/apis/config/v2/zz_generated.deepcopy.go index f48ac7aa24..339a15f3a1 100644 --- a/pkg/kube/apisix/apis/config/v2/zz_generated.deepcopy.go +++ b/pkg/kube/apisix/apis/config/v2/zz_generated.deepcopy.go @@ -781,6 +781,13 @@ func (in *ApisixRouteHTTP) DeepCopyInto(out *ApisixRouteHTTP) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.Upstreams != nil { + in, out := &in.Upstreams, &out.Upstreams + *out = make([]ApisixRouteUpstreamReference, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } if in.Plugins != nil { in, out := &in.Plugins, &out.Plugins *out = make([]ApisixRoutePlugin, len(*in)) @@ -1048,6 +1055,27 @@ func (in *ApisixRouteStreamMatch) DeepCopy() *ApisixRouteStreamMatch { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ApisixRouteUpstreamReference) DeepCopyInto(out *ApisixRouteUpstreamReference) { + *out = *in + if in.Weight != nil { + in, out := &in.Weight, &out.Weight + *out = new(int) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApisixRouteUpstreamReference. +func (in *ApisixRouteUpstreamReference) DeepCopy() *ApisixRouteUpstreamReference { + if in == nil { + return nil + } + out := new(ApisixRouteUpstreamReference) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ApisixSecret) DeepCopyInto(out *ApisixSecret) { *out = *in @@ -1259,6 +1287,43 @@ func (in *ApisixUpstreamConfig) DeepCopy() *ApisixUpstreamConfig { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ApisixUpstreamExternalNode) DeepCopyInto(out *ApisixUpstreamExternalNode) { + *out = *in + if in.Weight != nil { + in, out := &in.Weight, &out.Weight + *out = new(int) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApisixUpstreamExternalNode. +func (in *ApisixUpstreamExternalNode) DeepCopy() *ApisixUpstreamExternalNode { + if in == nil { + return nil + } + out := new(ApisixUpstreamExternalNode) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ApisixUpstreamExternalType) DeepCopyInto(out *ApisixUpstreamExternalType) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApisixUpstreamExternalType. +func (in *ApisixUpstreamExternalType) DeepCopy() *ApisixUpstreamExternalType { + if in == nil { + return nil + } + out := new(ApisixUpstreamExternalType) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ApisixUpstreamList) DeepCopyInto(out *ApisixUpstreamList) { *out = *in @@ -1295,6 +1360,13 @@ func (in *ApisixUpstreamList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ApisixUpstreamSpec) DeepCopyInto(out *ApisixUpstreamSpec) { *out = *in + if in.ExternalNodes != nil { + in, out := &in.ExternalNodes, &out.ExternalNodes + *out = make([]ApisixUpstreamExternalNode, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } in.ApisixUpstreamConfig.DeepCopyInto(&out.ApisixUpstreamConfig) if in.PortLevelSettings != nil { in, out := &in.PortLevelSettings, &out.PortLevelSettings diff --git a/pkg/kube/apisix_upstream.go b/pkg/kube/apisix_upstream.go index b3da976737..2ac51791a5 100644 --- a/pkg/kube/apisix_upstream.go +++ b/pkg/kube/apisix_upstream.go @@ -17,6 +17,8 @@ package kube import ( "errors" + "k8s.io/apimachinery/pkg/labels" + "github.com/apache/apisix-ingress-controller/pkg/config" configv2 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2" configv2beta3 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2beta3" @@ -28,9 +30,11 @@ import ( // it aims at to be compatible with different ApisixUpstream versions. type ApisixUpstreamLister interface { // V2beta3 gets the ApisixUpstream in apisix.apache.org/v2beta3. - V2beta3(string, string) (ApisixUpstream, error) + V2beta3(namespace, name string) (ApisixUpstream, error) // V2 gets the ApisixUpstream in apisix.apache.org/v2. - V2(string, string) (ApisixUpstream, error) + V2(namespace, name string) (ApisixUpstream, error) + // ListV2 gets v2.ApisixUpstreams + ListV2(namespace string) ([]*configv2.ApisixUpstream, error) } // ApisixUpstreamInformer is an encapsulation for the informer of ApisixUpstream, @@ -120,6 +124,10 @@ func (l *apisixUpstreamLister) V2(namespace, name string) (ApisixUpstream, error }, nil } +func (l *apisixUpstreamLister) ListV2(namespace string) ([]*configv2.ApisixUpstream, error) { + return l.v2Lister.ApisixUpstreams(namespace).List(labels.Everything()) +} + // MustNewApisixUpstream creates a kube.ApisixUpstream object according to the // type of obj. func MustNewApisixUpstream(obj interface{}) ApisixUpstream { diff --git a/pkg/providers/apisix/apisix_route.go b/pkg/providers/apisix/apisix_route.go index 81eec188bd..b11539adef 100644 --- a/pkg/providers/apisix/apisix_route.go +++ b/pkg/providers/apisix/apisix_route.go @@ -44,28 +44,43 @@ import ( type apisixRouteController struct { *apisixCommon - workqueue workqueue.RateLimitingInterface - workers int + workqueue workqueue.RateLimitingInterface + relatedWorkqueue workqueue.RateLimitingInterface + workers int - svcInformer cache.SharedIndexInformer - apisixRouteLister kube.ApisixRouteLister - apisixRouteInformer cache.SharedIndexInformer + svcInformer cache.SharedIndexInformer + apisixRouteLister kube.ApisixRouteLister + apisixRouteInformer cache.SharedIndexInformer + apisixUpstreamInformer cache.SharedIndexInformer svcLock sync.RWMutex - svcMap map[string]map[string]struct{} + // service key -> apisix route key + svcMap map[string]map[string]struct{} + + apisixUpstreamLock sync.RWMutex + // apisix upstream key -> apisix route key + apisixUpstreamMap map[string]map[string]struct{} +} + +type routeEvent struct { + Key string + Type string } func newApisixRouteController(common *apisixCommon, apisixRouteInformer cache.SharedIndexInformer, apisixRouteLister kube.ApisixRouteLister) *apisixRouteController { c := &apisixRouteController{ - apisixCommon: common, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ApisixRoute"), - workers: 1, + apisixCommon: common, + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ApisixRoute"), + relatedWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ApisixRouteRelated"), + workers: 1, - svcInformer: common.SvcInformer, - apisixRouteLister: apisixRouteLister, - apisixRouteInformer: apisixRouteInformer, + svcInformer: common.SvcInformer, + apisixRouteLister: apisixRouteLister, + apisixRouteInformer: apisixRouteInformer, + apisixUpstreamInformer: common.ApisixUpstreamInformer, - svcMap: make(map[string]map[string]struct{}), + svcMap: make(map[string]map[string]struct{}), + apisixUpstreamMap: make(map[string]map[string]struct{}), } c.apisixRouteInformer.AddEventHandler( @@ -80,6 +95,12 @@ func newApisixRouteController(common *apisixCommon, apisixRouteInformer cache.Sh AddFunc: c.onSvcAdd, }, ) + c.ApisixUpstreamInformer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.onApisixUpstreamAdd, + UpdateFunc: c.onApisixUpstreamUpdate, + }, + ) return c } @@ -88,6 +109,7 @@ func (c *apisixRouteController) run(ctx context.Context) { log.Info("ApisixRoute controller started") defer log.Info("ApisixRoute controller exited") defer c.workqueue.ShutDown() + defer c.relatedWorkqueue.ShutDown() ok := cache.WaitForCacheSync(ctx.Done(), c.apisixRouteInformer.HasSynced, c.svcInformer.HasSynced) if !ok { @@ -97,6 +119,7 @@ func (c *apisixRouteController) run(ctx context.Context) { for i := 0; i < c.workers; i++ { go c.runWorker(ctx) + go c.runRelatedWorker(ctx) } <-ctx.Done() } @@ -113,20 +136,40 @@ func (c *apisixRouteController) runWorker(ctx context.Context) { err := c.sync(ctx, val) c.workqueue.Done(obj) c.handleSyncErr(obj, err) - case string: - err := c.handleSvcAdd(val) + } + } +} + +func (c *apisixRouteController) runRelatedWorker(ctx context.Context) { + for { + obj, quit := c.relatedWorkqueue.Get() + if quit { + return + } + + ev := obj.(*routeEvent) + switch ev.Type { + case "service": + err := c.handleSvcAdd(ev.Key) c.workqueue.Done(obj) - c.handleSvcErr(val, err) + c.handleSvcErr(ev, err) + case "ApisixUpstream": + err := c.handleApisixUpstreamChange(ev.Key) + c.workqueue.Done(obj) + c.handleApisixUpstreamErr(ev, err) } } } -func (c *apisixRouteController) syncServiceRelationship(ev *types.Event, name string, ar kube.ApisixRoute) { +func (c *apisixRouteController) syncRelationship(ev *types.Event, routeKey string, ar kube.ApisixRoute) { obj := ev.Object.(kube.ApisixRouteEvent) var ( oldBackends []string newBackends []string + + oldUpstreams []string + newUpstreams []string ) switch obj.GroupVersion { case config.ApisixV2beta3: @@ -182,6 +225,10 @@ func (c *apisixRouteController) syncServiceRelationship(ev *types.Event, name st for _, backend := range rule.Backends { oldBackends = append(oldBackends, old.Namespace+"/"+backend.ServiceName) } + + for _, upstream := range rule.Upstreams { + oldUpstreams = append(oldUpstreams, old.Namespace+"/"+upstream.Name) + } } } if newObj != nil { @@ -189,6 +236,9 @@ func (c *apisixRouteController) syncServiceRelationship(ev *types.Event, name st for _, backend := range rule.Backends { newBackends = append(newBackends, newObj.Namespace+"/"+backend.ServiceName) } + for _, upstream := range rule.Upstreams { + newUpstreams = append(newUpstreams, newObj.Namespace+"/"+upstream.Name) + } } } default: @@ -203,19 +253,44 @@ func (c *apisixRouteController) syncServiceRelationship(ev *types.Event, name st // The last event processed MAY not be the logical last event, so it may override the logical previous event // We have a periodic full-sync, which reduce this problem, but it doesn't solve it completely. - c.svcLock.Lock() - defer c.svcLock.Unlock() toDelete := utils.Difference(oldBackends, newBackends) toAdd := utils.Difference(newBackends, oldBackends) + c.syncServiceRelationChanges(routeKey, toAdd, toDelete) + + toDelete = utils.Difference(oldUpstreams, newUpstreams) + toAdd = utils.Difference(newUpstreams, oldUpstreams) + c.syncApisixUpstreamRelationChanges(routeKey, toAdd, toDelete) +} + +func (c *apisixRouteController) syncServiceRelationChanges(routeKey string, toAdd, toDelete []string) { + c.svcLock.Lock() + defer c.svcLock.Unlock() + for _, svc := range toDelete { - delete(c.svcMap[svc], name) + delete(c.svcMap[svc], routeKey) } for _, svc := range toAdd { if _, ok := c.svcMap[svc]; !ok { c.svcMap[svc] = make(map[string]struct{}) } - c.svcMap[svc][name] = struct{}{} + c.svcMap[svc][routeKey] = struct{}{} + } +} + +func (c *apisixRouteController) syncApisixUpstreamRelationChanges(routeKey string, toAdd, toDelete []string) { + c.apisixUpstreamLock.Lock() + defer c.apisixUpstreamLock.Unlock() + + for _, au := range toDelete { + delete(c.apisixUpstreamMap[au], routeKey) + } + + for _, au := range toAdd { + if _, ok := c.apisixUpstreamMap[au]; !ok { + c.apisixUpstreamMap[au] = make(map[string]struct{}) + } + c.apisixUpstreamMap[au][routeKey] = struct{}{} } } @@ -263,7 +338,8 @@ func (c *apisixRouteController) sync(ctx context.Context, ev *types.Event) error } } - c.syncServiceRelationship(ev, name, ar) + // sync before translation + c.syncRelationship(ev, obj.Key, ar) if ev.Type == types.EventDelete { if ar != nil { @@ -592,9 +668,12 @@ func (c *apisixRouteController) ResourceSync() { objs := c.apisixRouteInformer.GetIndexer().List() c.svcLock.Lock() + c.apisixUpstreamLock.Lock() defer c.svcLock.Unlock() + defer c.apisixUpstreamLock.Unlock() c.svcMap = make(map[string]map[string]struct{}) + c.apisixUpstreamMap = make(map[string]map[string]struct{}) for _, obj := range objs { key, err := cache.MetaNamespaceKeyFunc(obj) @@ -616,7 +695,7 @@ func (c *apisixRouteController) ResourceSync() { }, }) - ns, name, err := cache.SplitMetaNamespaceKey(key) + ns, _, err := cache.SplitMetaNamespaceKey(key) if err != nil { log.Errorw("split ApisixRoute meta key failed", zap.Error(err), @@ -625,7 +704,10 @@ func (c *apisixRouteController) ResourceSync() { continue } - var backends []string + var ( + backends []string + upstreams []string + ) switch ar.GroupVersion() { case config.ApisixV2beta3: for _, rule := range ar.V2beta3().Spec.HTTP { @@ -638,6 +720,9 @@ func (c *apisixRouteController) ResourceSync() { for _, backend := range rule.Backends { backends = append(backends, ns+"/"+backend.ServiceName) } + for _, upstream := range rule.Upstreams { + upstreams = append(upstreams, ns+"/"+upstream.Name) + } } default: log.Errorw("unknown ApisixRoute version", @@ -649,7 +734,13 @@ func (c *apisixRouteController) ResourceSync() { if _, ok := c.svcMap[svcKey]; !ok { c.svcMap[svcKey] = make(map[string]struct{}) } - c.svcMap[svcKey][name] = struct{}{} + c.svcMap[svcKey][key] = struct{}{} + } + for _, upstreamKey := range upstreams { + if _, ok := c.apisixUpstreamMap[upstreamKey]; !ok { + c.apisixUpstreamMap[upstreamKey] = make(map[string]struct{}) + } + c.apisixUpstreamMap[upstreamKey][key] = struct{}{} } } } @@ -662,7 +753,7 @@ func (c *apisixRouteController) onSvcAdd(obj interface{}) { if err != nil { log.Errorw("found Service with bad meta key", zap.Error(err), - zap.String("key", key), + zap.Any("obj", obj), ) return } @@ -670,29 +761,106 @@ func (c *apisixRouteController) onSvcAdd(obj interface{}) { return } - c.workqueue.Add(key) + c.relatedWorkqueue.Add(&routeEvent{ + Key: key, + Type: "service", + }) } func (c *apisixRouteController) handleSvcAdd(key string) error { - ns, _, err := cache.SplitMetaNamespaceKey(key) + log.Debugw("handle svc add", zap.String("key", key)) + c.svcLock.RLock() + routes, ok := c.svcMap[key] + c.svcLock.RUnlock() + + if ok { + for routeKey := range routes { + c.workqueue.Add(&types.Event{ + Type: types.EventAdd, + Object: kube.ApisixRouteEvent{ + Key: routeKey, + GroupVersion: c.Kubernetes.APIVersion, + }, + }) + } + } + return nil +} + +func (c *apisixRouteController) handleSvcErr(ev *routeEvent, errOrigin error) { + if errOrigin == nil { + c.workqueue.Forget(ev) + + return + } + + log.Warnw("sync Service failed, will retry", + zap.Any("key", ev.Key), + zap.Error(errOrigin), + ) + c.relatedWorkqueue.AddRateLimited(ev) +} + +func (c *apisixRouteController) onApisixUpstreamAdd(obj interface{}) { + log.Debugw("ApisixUpstream add event arrived", + zap.Any("object", obj), + ) + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + log.Errorw("found Service with bad meta key", + zap.Error(err), + zap.Any("obj", obj), + ) + return + } + if !c.namespaceProvider.IsWatchingNamespace(key) { + return + } + + c.relatedWorkqueue.Add(&routeEvent{ + Key: key, + Type: "ApisixUpstream", + }) +} + +func (c *apisixRouteController) onApisixUpstreamUpdate(oldObj, newObj interface{}) { + log.Debugw("ApisixUpstream add event arrived", + zap.Any("object", newObj), + ) + + key, err := cache.MetaNamespaceKeyFunc(newObj) + if err != nil { + log.Errorf("found ApisixUpstream resource with bad meta namespace key: %s", err) + return + } if err != nil { - log.Errorw("failed to split Service meta key", + log.Errorw("found Service with bad meta key", zap.Error(err), - zap.String("key", key), + zap.Any("obj", newObj), ) - return nil + return + } + if !c.namespaceProvider.IsWatchingNamespace(key) { + return } + c.relatedWorkqueue.Add(&routeEvent{ + Key: key, + Type: "ApisixUpstream", + }) +} + +func (c *apisixRouteController) handleApisixUpstreamChange(key string) error { c.svcLock.RLock() - routes, ok := c.svcMap[key] + routes, ok := c.apisixUpstreamMap[key] c.svcLock.RUnlock() if ok { - for route := range routes { + for routeKey := range routes { c.workqueue.Add(&types.Event{ Type: types.EventAdd, Object: kube.ApisixRouteEvent{ - Key: ns + "/" + route, + Key: routeKey, GroupVersion: c.Kubernetes.APIVersion, }, }) @@ -701,18 +869,18 @@ func (c *apisixRouteController) handleSvcAdd(key string) error { return nil } -func (c *apisixRouteController) handleSvcErr(key string, errOrigin error) { +func (c *apisixRouteController) handleApisixUpstreamErr(ev *routeEvent, errOrigin error) { if errOrigin == nil { - c.workqueue.Forget(key) + c.workqueue.Forget(ev) return } - log.Warnw("sync Service failed, will retry", - zap.Any("key", key), + log.Warnw("sync ApisixUpstream add event failed, will retry", + zap.Any("key", ev.Key), zap.Error(errOrigin), ) - c.workqueue.AddRateLimited(key) + c.workqueue.AddRateLimited(ev) } // recordStatus record resources status @@ -792,3 +960,25 @@ func (c *apisixRouteController) recordStatus(at interface{}, reason string, err log.Errorf("unsupported resource record: %s", v) } } + +func (c *apisixRouteController) NotifyServiceAdd(key string) { + if !c.namespaceProvider.IsWatchingNamespace(key) { + return + } + + c.relatedWorkqueue.Add(&routeEvent{ + Key: key, + Type: "service", + }) +} + +func (c *apisixRouteController) NotifyApisixUpstreamChange(key string) { + if !c.namespaceProvider.IsWatchingNamespace(key) { + return + } + + c.relatedWorkqueue.Add(&routeEvent{ + Key: key, + Type: "ApisixUpstream", + }) +} diff --git a/pkg/providers/apisix/apisix_upstream.go b/pkg/providers/apisix/apisix_upstream.go index 4d9e29b239..e3992abc92 100644 --- a/pkg/providers/apisix/apisix_upstream.go +++ b/pkg/providers/apisix/apisix_upstream.go @@ -17,6 +17,7 @@ package apisix import ( "context" "fmt" + "sync" "time" "go.uber.org/zap" @@ -43,25 +44,38 @@ import ( type apisixUpstreamController struct { *apisixCommon - workqueue workqueue.RateLimitingInterface - workers int + workqueue workqueue.RateLimitingInterface + svcWorkqueue workqueue.RateLimitingInterface + workers int svcInformer cache.SharedIndexInformer svcLister listerscorev1.ServiceLister apisixUpstreamInformer cache.SharedIndexInformer apisixUpstreamLister kube.ApisixUpstreamLister + + externalSvcLock sync.RWMutex + // external name service name -> apisix upstream name + externalServiceMap map[string]map[string]struct{} + + // ApisixRouteController don't know how service change affect ApisixUpstream + // So we need to notify it here + notifyApisixUpstreamChange func(string) } -func newApisixUpstreamController(common *apisixCommon) *apisixUpstreamController { +func newApisixUpstreamController(common *apisixCommon, notifyApisixUpstreamChange func(string)) *apisixUpstreamController { c := &apisixUpstreamController{ apisixCommon: common, workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ApisixUpstream"), + svcWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ApisixUpstreamService"), workers: 1, svcInformer: common.SvcInformer, svcLister: common.SvcLister, apisixUpstreamLister: common.ApisixUpstreamLister, apisixUpstreamInformer: common.ApisixUpstreamInformer, + + externalServiceMap: make(map[string]map[string]struct{}), + notifyApisixUpstreamChange: notifyApisixUpstreamChange, } c.apisixUpstreamInformer.AddEventHandler( @@ -71,6 +85,13 @@ func newApisixUpstreamController(common *apisixCommon) *apisixUpstreamController DeleteFunc: c.onDelete, }, ) + c.svcInformer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.onSvcAdd, + UpdateFunc: c.onSvcUpdate, + DeleteFunc: c.onSvcDelete, + }, + ) return c } @@ -78,6 +99,7 @@ func (c *apisixUpstreamController) run(ctx context.Context) { log.Info("ApisixUpstream controller started") defer log.Info("ApisixUpstream controller exited") defer c.workqueue.ShutDown() + defer c.svcWorkqueue.ShutDown() if ok := cache.WaitForCacheSync(ctx.Done(), c.apisixUpstreamInformer.HasSynced, c.svcInformer.HasSynced); !ok { log.Error("cache sync failed") @@ -85,6 +107,7 @@ func (c *apisixUpstreamController) run(ctx context.Context) { } for i := 0; i < c.workers; i++ { go c.runWorker(ctx) + go c.runSvcWorker(ctx) } <-ctx.Done() @@ -102,6 +125,19 @@ func (c *apisixUpstreamController) runWorker(ctx context.Context) { } } +func (c *apisixUpstreamController) runSvcWorker(ctx context.Context) { + for { + obj, quit := c.svcWorkqueue.Get() + if quit { + return + } + key := obj.(string) + err := c.handleSvcChange(ctx, key) + c.svcWorkqueue.Done(obj) + c.handleSvcErr(key, err) + } +} + // sync Used to synchronize ApisixUpstream resources, because upstream alone exists in APISIX and will not be affected, // the synchronization logic only includes upstream's unique configuration management // So when ApisixUpstream was deleted, only the scheme / load balancer / healthcheck / retry / timeout @@ -154,6 +190,8 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er multiVersioned = ev.Tombstone.(kube.ApisixUpstream) } + c.syncRelationship(ev, key, multiVersioned) + switch event.GroupVersion { case config.ApisixV2beta3: au := multiVersioned.V2beta3() @@ -241,9 +279,36 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er } case config.ApisixV2: au := multiVersioned.V2() + if au.Spec == nil { + return nil + } + + if len(au.Spec.ExternalNodes) != 0 { + var newUps *apisixv1.Upstream + if ev.Type != types.EventDelete { + cfg := &au.Spec.ApisixUpstreamConfig + newUps, err = c.translator.TranslateUpstreamConfigV2(cfg) + if err != nil { + log.Errorw("failed to translate upstream config", + zap.Any("object", au), + zap.Error(err), + ) + c.RecordEvent(au, corev1.EventTypeWarning, utils.ResourceSyncAborted, err) + c.recordStatus(au, utils.ResourceSyncAborted, err, metav1.ConditionFalse, au.GetGeneration()) + return err + } + } + + err := c.updateExternalNodes(ctx, au, nil, newUps) + if err != nil { + return err + } + + return nil + } var portLevelSettings map[int32]configv2.ApisixUpstreamConfig - if au.Spec != nil && len(au.Spec.PortLevelSettings) > 0 { + if len(au.Spec.PortLevelSettings) > 0 { portLevelSettings = make(map[int32]configv2.ApisixUpstreamConfig, len(au.Spec.PortLevelSettings)) for _, port := range au.Spec.PortLevelSettings { portLevelSettings[port.Port] = port.ApisixUpstreamConfig @@ -260,7 +325,7 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er var subsets []configv2.ApisixUpstreamSubset subsets = append(subsets, configv2.ApisixUpstreamSubset{}) - if au.Spec != nil && len(au.Spec.Subsets) > 0 { + if len(au.Spec.Subsets) > 0 { subsets = append(subsets, au.Spec.Subsets...) } clusterName := c.Config.APISIX.DefaultClusterName @@ -279,7 +344,7 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er return err } var newUps *apisixv1.Upstream - if au.Spec != nil && ev.Type != types.EventDelete { + if ev.Type != types.EventDelete { cfg, ok := portLevelSettings[port.Port] if !ok { cfg = au.Spec.ApisixUpstreamConfig @@ -339,6 +404,115 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er return err } +func (c *apisixUpstreamController) updateExternalNodes(ctx context.Context, au *configv2.ApisixUpstream, old *configv2.ApisixUpstream, newUps *apisixv1.Upstream) error { + clusterName := c.Config.APISIX.DefaultClusterName + + // TODO: if old is not nil, diff the external nodes change first + + upsName := apisixv1.ComposeExternalUpstreamName(au.Namespace, au.Name) + + ups, err := c.APISIX.Cluster(clusterName).Upstream().Get(ctx, upsName) + if err != nil { + if err != apisixcache.ErrNotFound { + log.Errorf("failed to get upstream %s: %s", upsName, err) + c.RecordEvent(au, corev1.EventTypeWarning, utils.ResourceSyncAborted, err) + c.recordStatus(au, utils.ResourceSyncAborted, err, metav1.ConditionFalse, au.GetGeneration()) + return err + } + // Do nothing if not found + } else { + nodes, err := c.translator.TranslateApisixUpstreamExternalNodes(au) + if err != nil { + log.Errorf("failed to translate upstream external nodes %s: %s", upsName, err) + c.RecordEvent(au, corev1.EventTypeWarning, utils.ResourceSyncAborted, err) + c.recordStatus(au, utils.ResourceSyncAborted, err, metav1.ConditionFalse, au.GetGeneration()) + return err + } + if newUps != nil { + newUps.Metadata = ups.Metadata + ups = newUps + } + + ups.Nodes = nodes + if _, err := c.APISIX.Cluster(clusterName).Upstream().Update(ctx, ups); err != nil { + log.Errorw("failed to update external nodes upstream", + zap.Error(err), + zap.Any("upstream", ups), + zap.Any("ApisixUpstream", au), + zap.String("cluster", clusterName), + ) + c.RecordEvent(au, corev1.EventTypeWarning, utils.ResourceSyncAborted, err) + c.recordStatus(au, utils.ResourceSyncAborted, err, metav1.ConditionFalse, au.GetGeneration()) + return err + } + } + return nil +} + +func (c *apisixUpstreamController) syncRelationship(ev *types.Event, auKey string, au kube.ApisixUpstream) { + obj := ev.Object.(kube.ApisixUpstreamEvent) + + if obj.GroupVersion != config.ApisixV2 { + return + } + + var ( + old *configv2.ApisixUpstream + newObj *configv2.ApisixUpstream + ) + + if ev.Type == types.EventUpdate { + old = obj.OldObject.V2() + } else if ev.Type == types.EventDelete { + old = ev.Tombstone.(kube.ApisixUpstream).V2() + } + + if ev.Type != types.EventDelete { + newObj = au.V2() + } + + var ( + //oldExternalDomains []string + //newExternalDomains []string + oldExternalServices []string + newExternalServices []string + ) + if old != nil { + for _, node := range old.Spec.ExternalNodes { + if node.Type == configv2.ExternalTypeDomain { + //oldExternalDomains = append(oldExternalDomains, node.Name) + } else if node.Type == configv2.ExternalTypeService { + oldExternalServices = append(oldExternalServices, old.Namespace+"/"+node.Name) + } + } + } + if newObj != nil { + for _, node := range newObj.Spec.ExternalNodes { + if node.Type == configv2.ExternalTypeDomain { + //newExternalDomains = append(newExternalDomains, node.Name) + } else if node.Type == configv2.ExternalTypeService { + newExternalServices = append(newExternalServices, newObj.Namespace+"/"+node.Name) + } + } + } + + c.externalSvcLock.Lock() + defer c.externalSvcLock.Unlock() + + toDelete := utils.Difference(oldExternalServices, newExternalServices) + toAdd := utils.Difference(newExternalServices, oldExternalServices) + for _, svc := range toDelete { + delete(c.externalServiceMap[svc], auKey) + } + + for _, svc := range toAdd { + if _, ok := c.externalServiceMap[svc]; !ok { + c.externalServiceMap[svc] = make(map[string]struct{}) + } + c.externalServiceMap[svc][auKey] = struct{}{} + } +} + func (c *apisixUpstreamController) handleSyncErr(obj interface{}, err error) { if err == nil { c.workqueue.Forget(obj) @@ -494,6 +668,146 @@ func (c *apisixUpstreamController) ResourceSync() { } } +func (c *apisixUpstreamController) onSvcAdd(obj interface{}) { + svc, ok := obj.(*corev1.Service) + if !ok { + log.Errorw("got service add event, but it is not a Service", + zap.Any("obj", obj), + ) + } + + log.Debugw("Service add event arrived", + zap.Any("object", obj), + ) + + if svc.Spec.Type != corev1.ServiceTypeExternalName { + return + } + + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + log.Errorw("found Service with bad meta key", + zap.Error(err), + zap.Any("obj", obj), + ) + return + } + c.svcWorkqueue.Add(key) +} + +func (c *apisixUpstreamController) onSvcUpdate(old, new interface{}) { + oldSvc, ok := old.(*corev1.Service) + if !ok { + log.Errorw("got service update event, but old one is not a Service", + zap.Any("old", old), + ) + } + newSvc, ok := new.(*corev1.Service) + if !ok { + log.Errorw("got service update event, but new one is not a Service", + zap.Any("new", new), + ) + } + + if newSvc.Spec.Type != corev1.ServiceTypeExternalName { + return + } + + if newSvc.Spec.ExternalName != oldSvc.Spec.ExternalName { + key, err := cache.MetaNamespaceKeyFunc(newSvc) + if err != nil { + log.Errorw("found Service with bad meta key", + zap.Error(err), + zap.Any("obj", newSvc), + ) + return + } + c.svcWorkqueue.Add(key) + } +} + +func (c *apisixUpstreamController) onSvcDelete(obj interface{}) { + svc, ok := obj.(*corev1.Service) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + return + } + svc, ok = tombstone.Obj.(*corev1.Service) + if !ok { + log.Errorw("got service delete event, but it is not a Service", + zap.Any("obj", obj), + ) + return + } + } + if svc.Spec.Type != corev1.ServiceTypeExternalName { + return + } + + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + log.Errorw("found Service with bad meta key", + zap.Error(err), + zap.Any("obj", obj), + ) + return + } + c.svcWorkqueue.Add(key) +} + +func (c *apisixUpstreamController) handleSvcChange(ctx context.Context, key string) error { + var toUpdateUpstreams []string + + c.externalSvcLock.RLock() + if ups, ok := c.externalServiceMap[key]; ok { + for upKey := range ups { + toUpdateUpstreams = append(toUpdateUpstreams, upKey) + } + } + c.externalSvcLock.RUnlock() + + //log.Debugw("handleSvcChange", + // zap.Any("service map", c.externalServiceMap), + // zap.Strings("affectedUpstreams", toUpdateUpstreams), + //) + + for _, upKey := range toUpdateUpstreams { + log.Debugw("Service change event trigger ApisixUpstream sync", + zap.Any("service", key), + zap.Any("ApisixUpstream", upKey), + ) + c.notifyApisixUpstreamChange(upKey) + ns, name, err := cache.SplitMetaNamespaceKey(upKey) + if err != nil { + return err + } + au, err := c.apisixUpstreamLister.V2(ns, name) + if err != nil { + return err + } + err = c.updateExternalNodes(ctx, au.V2(), nil, nil) + if err != nil { + return err + } + } + + return nil +} + +func (c *apisixUpstreamController) handleSvcErr(key string, errOrigin error) { + if errOrigin == nil { + c.workqueue.Forget(key) + return + } + + log.Warnw("sync Service failed, will retry", + zap.Any("key", key), + zap.Error(errOrigin), + ) + c.svcWorkqueue.AddRateLimited(key) +} + // recordStatus record resources status func (c *apisixUpstreamController) recordStatus(at interface{}, reason string, err error, status metav1.ConditionStatus, generation int64) { // build condition diff --git a/pkg/providers/apisix/provider.go b/pkg/providers/apisix/provider.go index 0d97ef32a1..627adcfe67 100644 --- a/pkg/providers/apisix/provider.go +++ b/pkg/providers/apisix/provider.go @@ -51,6 +51,8 @@ type Provider interface { Init(ctx context.Context) error ResourceSync() + NotifyServiceAdd(key string) + NotifyApisixUpstreamChange(key string) SyncSecretChange(ctx context.Context, ev *types.Event, secret *corev1.Secret, secretMapKey string) } @@ -86,10 +88,12 @@ func NewProvider(common *providertypes.Common, namespaceProvider namespace.Watch apisixFactory := common.KubeClient.NewAPISIXSharedIndexInformerFactory() p.apisixTranslator = apisixtranslation.NewApisixTranslator(&apisixtranslation.TranslatorOptions{ - Apisix: common.APISIX, - ClusterName: common.Config.APISIX.DefaultClusterName, - ServiceLister: common.SvcLister, - SecretLister: common.SecretLister, + Apisix: common.APISIX, + ClusterName: common.Config.APISIX.DefaultClusterName, + + ApisixUpstreamLister: common.ApisixUpstreamLister, + ServiceLister: common.SvcLister, + SecretLister: common.SecretLister, }, translator) c := &apisixCommon{ Common: common, @@ -137,7 +141,7 @@ func NewProvider(common *providertypes.Common, namespaceProvider namespace.Watch apisixFactory.Apisix().V2().ApisixPluginConfigs().Lister(), ) - p.apisixUpstreamController = newApisixUpstreamController(c) + p.apisixUpstreamController = newApisixUpstreamController(c, p.NotifyApisixUpstreamChange) p.apisixRouteController = newApisixRouteController(c, p.apisixRouteInformer, apisixRouteLister) p.apisixTlsController = newApisixTlsController(c, p.apisixTlsInformer, apisixTlsLister) p.apisixClusterConfigController = newApisixClusterConfigController(c, p.apisixClusterConfigInformer, apisixClusterConfigLister) @@ -201,6 +205,14 @@ func (p *apisixProvider) ResourceSync() { e.Wait() } +func (p *apisixProvider) NotifyServiceAdd(key string) { + p.apisixRouteController.NotifyServiceAdd(key) +} + +func (p *apisixProvider) NotifyApisixUpstreamChange(key string) { + p.apisixRouteController.NotifyApisixUpstreamChange(key) +} + func (p *apisixProvider) SyncSecretChange(ctx context.Context, ev *types.Event, secret *corev1.Secret, secretMapKey string) { p.apisixTlsController.SyncSecretChange(ctx, ev, secret, secretMapKey) } diff --git a/pkg/providers/apisix/translation/apisix_plugin.go b/pkg/providers/apisix/translation/apisix_plugin.go index 1115cfe2f0..9a93281fbb 100644 --- a/pkg/providers/apisix/translation/apisix_plugin.go +++ b/pkg/providers/apisix/translation/apisix_plugin.go @@ -66,7 +66,7 @@ func (t *translator) translateTrafficSplitPlugin(ctx *translation.TranslateConte }) } - // Finally append the default upstream in the route. + // append the default upstream in the route. wups = append(wups, apisixv1.TrafficSplitConfigRuleWeightedUpstream{ Weight: defaultBackendWeight, }) diff --git a/pkg/providers/apisix/translation/apisix_route.go b/pkg/providers/apisix/translation/apisix_route.go index d5ab4bb6ad..aa22a829f1 100644 --- a/pkg/providers/apisix/translation/apisix_route.go +++ b/pkg/providers/apisix/translation/apisix_route.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "strconv" "strings" "go.uber.org/zap" @@ -361,21 +362,6 @@ func (t *translator) translateHTTPRouteV2(ctx *translation.TranslateContext, ar return errors.New("duplicated route rule name") } ruleNameMap[part.Name] = struct{}{} - backends := part.Backends - // Use the first backend as the default backend in Route, - // others will be configured in traffic-split plugin. - backend := backends[0] - backends = backends[1:] - - svcClusterIP, svcPort, err := t.GetServiceClusterIPAndPort(&backend, ar.Namespace) - if err != nil { - log.Errorw("failed to get service port in backend", - zap.Any("backend", backend), - zap.Any("apisix_route", ar), - zap.Error(err), - ) - return err - } var timeout *apisixv1.UpstreamTimeout if part.Timeout != nil { @@ -425,7 +411,10 @@ func (t *translator) translateHTTPRouteV2(ctx *translation.TranslateContext, ar } } - var exprs [][]apisixv1.StringOrSlice + var ( + exprs [][]apisixv1.StringOrSlice + err error + ) if part.Match.NginxVars != nil { exprs, err = t.TranslateRouteMatchExprs(part.Match.NginxVars) if err != nil { @@ -445,7 +434,6 @@ func (t *translator) translateHTTPRouteV2(ctx *translation.TranslateContext, ar return err } - upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, backend.ServiceName, backend.Subset, svcPort, backend.ResolveGranularity) route := apisixv1.NewDefaultRoute() route.Name = apisixv1.ComposeRouteName(ar.Namespace, ar.Name, part.Name) route.ID = id.GenID(route.Name) @@ -455,36 +443,158 @@ func (t *translator) translateHTTPRouteV2(ctx *translation.TranslateContext, ar route.Hosts = part.Match.Hosts route.Uris = part.Match.Paths route.Methods = part.Match.Methods - route.UpstreamId = id.GenID(upstreamName) route.EnableWebsocket = part.Websocket route.Plugins = pluginMap route.Timeout = timeout - if part.PluginConfigName != "" { - route.PluginConfigId = id.GenID(apisixv1.ComposePluginConfigName(ar.Namespace, part.PluginConfigName)) - } + ctx.AddRoute(route) + // --- translate "Backends" --- + backends := part.Backends if len(backends) > 0 { - weight := translation.DefaultWeight - if backend.Weight != nil { - weight = *backend.Weight - } - plugin, err := t.translateTrafficSplitPlugin(ctx, ar.Namespace, weight, backends) + // Use the first backend as the default backend in Route, + // others will be configured in traffic-split plugin. + backend := backends[0] + backends = backends[1:] + + svcClusterIP, svcPort, err := t.GetServiceClusterIPAndPort(&backend, ar.Namespace) if err != nil { - log.Errorw("failed to translate traffic-split plugin", + log.Errorw("failed to get service port in backend", + zap.Any("backend", backend), + zap.Any("apisix_route", ar), zap.Error(err), - zap.Any("ApisixRoute", ar), ) return err } - route.Plugins["traffic-split"] = plugin + + upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, backend.ServiceName, backend.Subset, svcPort, backend.ResolveGranularity) + route.UpstreamId = id.GenID(upstreamName) + if part.PluginConfigName != "" { + route.PluginConfigId = id.GenID(apisixv1.ComposePluginConfigName(ar.Namespace, part.PluginConfigName)) + } + + if len(backends) > 0 { + weight := translation.DefaultWeight + if backend.Weight != nil { + weight = *backend.Weight + } + plugin, err := t.translateTrafficSplitPlugin(ctx, ar.Namespace, weight, backends) + if err != nil { + log.Errorw("failed to translate traffic-split plugin", + zap.Error(err), + zap.Any("ApisixRoute", ar), + ) + return err + } + route.Plugins["traffic-split"] = plugin + } + if !ctx.CheckUpstreamExist(upstreamName) { + ups, err := t.translateService(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort) + if err != nil { + return err + } + ctx.AddUpstream(ups) + } } - ctx.AddRoute(route) - if !ctx.CheckUpstreamExist(upstreamName) { - ups, err := t.translateService(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort) + + if len(part.Backends) == 0 && len(part.Upstreams) > 0 { + // Only have Upstreams + upName := apisixv1.ComposeExternalUpstreamName(ar.Namespace, part.Upstreams[0].Name) + route.UpstreamId = id.GenID(upName) + } + // --- translate Upstreams --- + var ups []*apisixv1.Upstream + for i, au := range part.Upstreams { + up, err := t.translateExternalApisixUpstream(ar.Namespace, au.Name) if err != nil { - return err + log.Errorw(fmt.Sprintf("failed to translate ApisixUpstream at Upstream[%v]", i), + zap.Error(err), + zap.String("apisix_upstream", ar.Namespace+"/"+au.Name), + ) + continue } - ctx.AddUpstream(ups) + if au.Weight != nil { + up.Labels["meta_weight"] = strconv.Itoa(*au.Weight) + } else { + up.Labels["meta_weight"] = strconv.Itoa(translation.DefaultWeight) + } + ups = append(ups, up) + } + + if len(ups) == 0 { + continue + } + + var wups []apisixv1.TrafficSplitConfigRuleWeightedUpstream + if len(part.Backends) == 0 { + if len(ups) > 1 { + for i, up := range ups { + weight, err := strconv.Atoi(up.Labels["meta_weight"]) + if err != nil { + // shouldn't happen + log.Errorw(fmt.Sprintf("failed to parse translated upstream weight at %v", i), + zap.Error(err), + zap.String("meta_weight", up.Labels["meta_weight"]), + ) + continue + } + if i == 0 { + // set as default + wups = append(wups, apisixv1.TrafficSplitConfigRuleWeightedUpstream{ + Weight: weight, + }) + } else { + wups = append(wups, apisixv1.TrafficSplitConfigRuleWeightedUpstream{ + UpstreamID: ups[i].ID, + Weight: weight, + }) + } + } + } + } else { + // Mixed backends and upstreams + if cfg, ok := route.Plugins["traffic-split"]; ok { + if tsCfg, ok := cfg.(*apisixv1.TrafficSplitConfig); ok { + wups = tsCfg.Rules[0].WeightedUpstreams + } + } + if len(wups) == 0 { + // append the default upstream in the route. + weight := translation.DefaultWeight + if part.Backends[0].Weight != nil { + weight = *part.Backends[0].Weight + } + wups = append(wups, apisixv1.TrafficSplitConfigRuleWeightedUpstream{ + Weight: weight, + }) + } + for i, up := range ups { + weight, err := strconv.Atoi(up.Labels["meta_weight"]) + if err != nil { + // shouldn't happen + log.Errorw(fmt.Sprintf("failed to parse translated upstream weight at %v", i), + zap.Error(err), + zap.String("meta_weight", up.Labels["meta_weight"]), + ) + continue + } + wups = append(wups, apisixv1.TrafficSplitConfigRuleWeightedUpstream{ + UpstreamID: ups[i].ID, + Weight: weight, + }) + } + } + if len(wups) > 0 { + route.Plugins["traffic-split"] = &apisixv1.TrafficSplitConfig{ + Rules: []apisixv1.TrafficSplitConfigRule{ + { + WeightedUpstreams: wups, + }, + }, + } + } + + for _, up := range ups { + ctx.AddUpstream(up) } } return nil @@ -675,11 +785,6 @@ func (t *translator) translateHTTPRouteV2beta3NotStrictly(ctx *translation.Trans // translateHTTPRouteV2NotStrictly translates http route with a loose way, only generate ID and Name for delete Event. func (t *translator) translateHTTPRouteV2NotStrictly(ctx *translation.TranslateContext, ar *configv2.ApisixRoute) error { for _, part := range ar.Spec.HTTP { - backends := part.Backends - // Use the first backend as the default backend in Route, - // others will be configured in traffic-split plugin. - backend := backends[0] - pluginMap := make(apisixv1.Plugins) // add route plugins for _, plugin := range part.Plugins { @@ -711,7 +816,6 @@ func (t *translator) translateHTTPRouteV2NotStrictly(ctx *translation.TranslateC } } - upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal, backend.ResolveGranularity) route := apisixv1.NewDefaultRoute() route.Name = apisixv1.ComposeRouteName(ar.Namespace, ar.Name, part.Name) route.ID = id.GenID(route.Name) @@ -720,12 +824,33 @@ func (t *translator) translateHTTPRouteV2NotStrictly(ctx *translation.TranslateC } ctx.AddRoute(route) - if !ctx.CheckUpstreamExist(upstreamName) { - ups, err := t.translateUpstreamNotStrictly(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal, backend.ResolveGranularity) - if err != nil { - return err + + if len(part.Backends) > 0 { + backends := part.Backends + // Use the first backend as the default backend in Route, + // others will be configured in traffic-split plugin. + backend := backends[0] + + upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal, backend.ResolveGranularity) + if !ctx.CheckUpstreamExist(upstreamName) { + ups, err := t.translateUpstreamNotStrictly(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal, backend.ResolveGranularity) + if err != nil { + return err + } + ctx.AddUpstream(ups) + } + } + if len(part.Upstreams) > 0 { + upstreams := part.Upstreams + for _, upstream := range upstreams { + upstreamName := apisixv1.ComposeExternalUpstreamName(ar.Namespace, upstream.Name) + if !ctx.CheckUpstreamExist(upstreamName) { + ups := &apisixv1.Upstream{} + ups.Name = upstreamName + ups.ID = id.GenID(ups.Name) + ctx.AddUpstream(ups) + } } - ctx.AddUpstream(ups) } } return nil diff --git a/pkg/providers/apisix/translation/apisix_route_test.go b/pkg/providers/apisix/translation/apisix_route_test.go index c84bdd0e7e..f0a54443b1 100644 --- a/pkg/providers/apisix/translation/apisix_route_test.go +++ b/pkg/providers/apisix/translation/apisix_route_test.go @@ -496,3 +496,248 @@ func TestTranslateApisixRouteV2beta3NotStrictly(t *testing.T) { assert.Equal(t, id.GenID("test_svc1_81"), tx.Upstreams[0].ID, "upstream1 id error") assert.Equal(t, id.GenID("test_svc2_82"), tx.Upstreams[1].ID, "upstream2 id error") } + +func ptrOf[T interface{}](v T) *T { + return &v +} + +func mockTranslatorV2(t *testing.T) (*translator, <-chan struct{}) { + svc := &corev1.Service{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: "svc", + Namespace: "test", + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: "port1", + Port: 80, + TargetPort: intstr.IntOrString{ + Type: intstr.Int, + IntVal: 9080, + }, + }, + { + Name: "port2", + Port: 443, + TargetPort: intstr.IntOrString{ + Type: intstr.Int, + IntVal: 9443, + }, + }, + }, + }, + } + endpoints := &corev1.Endpoints{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: "svc", + Namespace: "test", + }, + Subsets: []corev1.EndpointSubset{ + { + Ports: []corev1.EndpointPort{ + { + Name: "port1", + Port: 9080, + }, + { + Name: "port2", + Port: 9443, + }, + }, + Addresses: []corev1.EndpointAddress{ + {IP: "192.168.1.1"}, + {IP: "192.168.1.2"}, + }, + }, + }, + } + + au := &configv2.ApisixUpstream{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: "au", + Namespace: "test", + }, + Spec: &configv2.ApisixUpstreamSpec{ + ExternalNodes: []configv2.ApisixUpstreamExternalNode{ + { + Name: "httpbin.org", + Type: configv2.ExternalTypeDomain, + Weight: ptrOf(1), + }, + }, + }, + } + + client := fake.NewSimpleClientset() + informersFactory := informers.NewSharedInformerFactory(client, 0) + svcInformer := informersFactory.Core().V1().Services().Informer() + svcLister := informersFactory.Core().V1().Services().Lister() + epLister, epInformer := kube.NewEndpointListerAndInformer(informersFactory, false) + apisixClient := fakeapisix.NewSimpleClientset() + apisixInformersFactory := apisixinformers.NewSharedInformerFactory(apisixClient, 0) + + auInformer := apisixInformersFactory.Apisix().V2().ApisixUpstreams().Informer() + auLister := kube.NewApisixUpstreamLister( + apisixInformersFactory.Apisix().V2beta3().ApisixUpstreams().Lister(), + apisixInformersFactory.Apisix().V2().ApisixUpstreams().Lister(), + ) + + _, err := client.CoreV1().Endpoints("test").Create(context.Background(), endpoints, metav1.CreateOptions{}) + assert.Nil(t, err) + _, err = client.CoreV1().Services("test").Create(context.Background(), svc, metav1.CreateOptions{}) + assert.Nil(t, err) + _, err = apisixClient.ApisixV2().ApisixUpstreams("test").Create(context.Background(), au, metav1.CreateOptions{}) + assert.Nil(t, err) + + tr := &translator{ + &TranslatorOptions{ + ServiceLister: svcLister, + ApisixUpstreamLister: auLister, + }, + translation.NewTranslator(&translation.TranslatorOptions{ + ServiceLister: svcLister, + EndpointLister: epLister, + ApisixUpstreamLister: auLister, + APIVersion: config.ApisixV2, + }), + } + + processCh := make(chan struct{}, 2) + svcInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + processCh <- struct{}{} + }, + }) + epInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + processCh <- struct{}{} + }, + }) + auInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + processCh <- struct{}{} + }, + }) + + stopCh := make(chan struct{}) + defer close(stopCh) + go svcInformer.Run(stopCh) + go epInformer.Run(stopCh) + go auInformer.Run(stopCh) + cache.WaitForCacheSync(stopCh, svcInformer.HasSynced) + + return tr, processCh +} + +func TestTranslateApisixRouteV2WithUpstream(t *testing.T) { + tr, processCh := mockTranslatorV2(t) + <-processCh + <-processCh + + ar := &configv2.ApisixRoute{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ar", + Namespace: "test", + }, + Spec: configv2.ApisixRouteSpec{ + HTTP: []configv2.ApisixRouteHTTP{ + { + Name: "rule1", + Match: configv2.ApisixRouteHTTPMatch{ + Paths: []string{ + "/*", + }, + }, + Backends: []configv2.ApisixRouteHTTPBackend{ + { + ServiceName: "svc", + ServicePort: intstr.IntOrString{ + IntVal: 80, + }, + Weight: ptrOf(2), + }, + }, + Upstreams: []configv2.ApisixRouteUpstreamReference{ + { + Name: "au", + Weight: ptrOf(1), + }, + }, + }, + }, + }, + } + + tctx, err := tr.TranslateRouteV2(ar) + assert.Nil(t, err) + + assert.Equal(t, 1, len(tctx.Routes)) + r := tctx.Routes[0] + + assert.NotNil(t, r.Plugins["traffic-split"]) + + tsCfg, ok := r.Plugins["traffic-split"].(*apisixv1.TrafficSplitConfig) + assert.Equal(t, true, ok) + assert.Equal(t, 1, len(tsCfg.Rules)) + assert.NotNil(t, tsCfg.Rules[0]) + assert.NotNil(t, tsCfg.Rules[0].WeightedUpstreams, "weighted upstreams") + + wups := tsCfg.Rules[0].WeightedUpstreams + + upsName := apisixv1.ComposeExternalUpstreamName(ar.Namespace, "au") + upsID := id.GenID(upsName) + assert.Equal(t, []apisixv1.TrafficSplitConfigRuleWeightedUpstream{ + { + // Default + UpstreamID: "", + Weight: 2, + }, + { + UpstreamID: upsID, + Weight: 1, + }, + }, wups) + + assert.Equal(t, 2, len(tctx.Upstreams)) + var ups *apisixv1.Upstream + for _, u := range tctx.Upstreams { + if u.ID == upsID { + ups = u + break + } + } + assert.NotNil(t, ups) + + // unset useless data + ups.Desc = "" + assert.Equal(t, &apisixv1.Upstream{ + Metadata: apisixv1.Metadata{ + ID: upsID, + Name: upsName, + Desc: "", + Labels: map[string]string{ + "managed-by": "apisix-ingress-controller", + "meta_weight": "1", + }, + }, + Type: apisixv1.LbRoundRobin, + HashOn: "", + Key: "", + Checks: nil, + Nodes: []apisixv1.UpstreamNode{ + { + Host: "httpbin.org", + Port: 80, + Weight: 1, + }, + }, + Scheme: apisixv1.SchemeHTTP, + Retries: nil, + Timeout: nil, + TLS: nil, + }, ups) +} diff --git a/pkg/providers/apisix/translation/apisix_upstream.go b/pkg/providers/apisix/translation/apisix_upstream.go index d3d725af50..e91dcfad7e 100644 --- a/pkg/providers/apisix/translation/apisix_upstream.go +++ b/pkg/providers/apisix/translation/apisix_upstream.go @@ -15,7 +15,16 @@ package translation import ( + "fmt" + "strconv" + "strings" + + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "github.com/apache/apisix-ingress-controller/pkg/id" + v2 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2" "github.com/apache/apisix-ingress-controller/pkg/providers/translation" "github.com/apache/apisix-ingress-controller/pkg/types" apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" @@ -47,3 +56,102 @@ func (t *translator) translateService(namespace, svcName, subset, svcResolveGran ups.ID = id.GenID(ups.Name) return ups, nil } + +// TODO: Support Port field +func (t *translator) TranslateApisixUpstreamExternalNodes(au *v2.ApisixUpstream) ([]apisixv1.UpstreamNode, error) { + var nodes []apisixv1.UpstreamNode + for i, node := range au.Spec.ExternalNodes { + if node.Type == v2.ExternalTypeDomain { + arr := strings.Split(node.Name, ":") + + weight := translation.DefaultWeight + if node.Weight != nil { + weight = *node.Weight + } + n := apisixv1.UpstreamNode{ + Host: arr[0], + Weight: weight, + } + + if len(arr) == 1 { + if strings.HasPrefix(arr[0], "https://") { + n.Port = 443 + } else { + n.Port = 80 + } + } else if len(arr) == 2 { + port, err := strconv.Atoi(arr[1]) + if err != nil { + return nil, errors.Wrap(err, fmt.Sprintf("failed to parse ApisixUpstream %s/%s port: at ExternalNodes[%v]: %s", au.Namespace, au.Name, i, node.Name)) + } + + n.Port = port + } + + nodes = append(nodes, n) + } else if node.Type == v2.ExternalTypeService { + svc, err := t.ServiceLister.Services(au.Namespace).Get(node.Name) + if err != nil { + // In theory, ApisixRoute now watches all service add event, a not found error is already handled + if k8serrors.IsNotFound(err) { + // TODO: Should retry + return nil, err + } + return nil, err + } + + if svc.Spec.Type != corev1.ServiceTypeExternalName { + return nil, fmt.Errorf("ApisixUpstream %s/%s ExternalNodes[%v] must refers to a ExternalName service: %s", au.Namespace, au.Name, i, node.Name) + } + + weight := translation.DefaultWeight + if node.Weight != nil { + weight = *node.Weight + } + n := apisixv1.UpstreamNode{ + Host: svc.Spec.ExternalName, + Weight: weight, + } + + // TODO: Support Port field. This is a temporary solution. + n.Port = 80 + + nodes = append(nodes, n) + } + } + return nodes, nil +} + +// TODO: Retry when ApisixUpstream/ExternalName service not found +func (t *translator) translateExternalApisixUpstream(namespace, upstream string) (*apisixv1.Upstream, error) { + multiVersioned, err := t.ApisixUpstreamLister.V2(namespace, upstream) + if err != nil { + if k8serrors.IsNotFound(err) { + // TODO: Should retry + return nil, err + } + return nil, err + } + + au := multiVersioned.V2() + if len(au.Spec.ExternalNodes) == 0 { + // should do further resolve + return nil, fmt.Errorf("%s/%s has empty ExternalNodes", namespace, upstream) + } + + ups, err := t.TranslateUpstreamConfigV2(&au.Spec.ApisixUpstreamConfig) + if err != nil { + return nil, err + } + ups.Name = apisixv1.ComposeExternalUpstreamName(namespace, upstream) + ups.ID = id.GenID(ups.Name) + + externalNodes, err := t.TranslateApisixUpstreamExternalNodes(au) + if err != nil { + return nil, err + } + + ups.Nodes = append(ups.Nodes, externalNodes...) + + return ups, nil +} diff --git a/pkg/providers/apisix/translation/translator.go b/pkg/providers/apisix/translation/translator.go index 3b89283f4e..a15ff3372f 100644 --- a/pkg/providers/apisix/translation/translator.go +++ b/pkg/providers/apisix/translation/translator.go @@ -32,8 +32,9 @@ type TranslatorOptions struct { Apisix apisix.APISIX ClusterName string - ServiceLister listerscorev1.ServiceLister - SecretLister listerscorev1.SecretLister + ApisixUpstreamLister kube.ApisixUpstreamLister + ServiceLister listerscorev1.ServiceLister + SecretLister listerscorev1.SecretLister } type translator struct { @@ -95,6 +96,9 @@ type ApisixTranslator interface { TranslatePluginConfigV2NotStrictly(*configv2.ApisixPluginConfig) (*translation.TranslateContext, error) TranslateRouteMatchExprs(nginxVars []configv2.ApisixRouteHTTPMatchExpr) ([][]apisixv1.StringOrSlice, error) + + // TranslateApisixUpstreamExternalNodes translates an ApisixUpstream with external nodes to APISIX nodes. + TranslateApisixUpstreamExternalNodes(au *configv2.ApisixUpstream) ([]apisixv1.UpstreamNode, error) } func NewApisixTranslator(opts *TranslatorOptions, t translation.Translator) ApisixTranslator { diff --git a/pkg/providers/utils/string.go b/pkg/providers/utils/string.go index df1968c7d9..414bad236d 100644 --- a/pkg/providers/utils/string.go +++ b/pkg/providers/utils/string.go @@ -24,6 +24,7 @@ func TruncateString(s string, max int) string { } // Difference returns elements only in a +// Duplicated elements are considered as same element func Difference(a, b []string) []string { bMap := make(map[string]struct{}, len(b)) for _, elem := range b { @@ -37,3 +38,10 @@ func Difference(a, b []string) []string { } return onlyInA } + +func Equal(a, b []string) bool { + if len(a) != len(b) { + return false + } + return len(Difference(a, b)) == 0 && len(Difference(b, a)) == 0 +} diff --git a/pkg/types/apisix/v1/types.go b/pkg/types/apisix/v1/types.go index 20e5d4a7c4..1cf5ee8348 100644 --- a/pkg/types/apisix/v1/types.go +++ b/pkg/types/apisix/v1/types.go @@ -507,6 +507,11 @@ func ComposeUpstreamName(namespace, name, subset string, port int32, resolveGran return buf.String() } +// ComposeExternalUpstreamName uses ApisixUpstream namespace, name to compose the upstream name. +func ComposeExternalUpstreamName(namespace, name string) string { + return namespace + "_" + name +} + // ComposeRouteName uses namespace, name and rule name to compose // the route name. func ComposeRouteName(namespace, name string, rule string) string { diff --git a/samples/deploy/crd/v1/ApisixRoute.yaml b/samples/deploy/crd/v1/ApisixRoute.yaml index cf0f8e586a..13654b36c9 100644 --- a/samples/deploy/crd/v1/ApisixRoute.yaml +++ b/samples/deploy/crd/v1/ApisixRoute.yaml @@ -609,7 +609,9 @@ spec: minItems: 1 items: type: object - required: ["name", "match", "backends"] + anyOf: + - required: ["name", "match", "backends"] + - required: ["name", "match", "upstreams"] properties: name: type: string @@ -710,6 +712,17 @@ spec: plugin_config_name: type: string minLength: 1 + upstreams: + description: Upstreams refer to ApisixUpstream CRD + type: array + items: + description: ApisixRouteUpstreamReference contains a ApisixUpstream CRD reference + type: object + properties: + name: + type: string + weight: + type: integer backends: type: array minItems: 1 diff --git a/samples/deploy/crd/v1/ApisixUpstream.yaml b/samples/deploy/crd/v1/ApisixUpstream.yaml index 752efa536e..27d6ef83c0 100644 --- a/samples/deploy/crd/v1/ApisixUpstream.yaml +++ b/samples/deploy/crd/v1/ApisixUpstream.yaml @@ -413,6 +413,19 @@ spec: spec: type: object properties: + externalNodes: + description: ExternalNodes contains external nodes the Upstream should use If this field is set, the upstream will use these nodes directly without any further resolves + type: array + items: + description: ApisixUpstreamExternalNode is the external node conf + type: object + properties: + name: + type: string + type: + type: string + weight: + type: integer subsets: type: array items: diff --git a/test/e2e/go.mod b/test/e2e/go.mod index c4fd20a9e7..f77183deca 100644 --- a/test/e2e/go.mod +++ b/test/e2e/go.mod @@ -11,6 +11,7 @@ require ( github.com/gruntwork-io/terratest v0.40.22 github.com/onsi/ginkgo/v2 v2.2.0 github.com/stretchr/testify v1.8.0 + go.uber.org/zap v1.23.0 k8s.io/api v0.25.2 k8s.io/apimachinery v0.25.2 k8s.io/client-go v0.25.2 @@ -92,7 +93,6 @@ require ( github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.8.0 // indirect - go.uber.org/zap v1.23.0 // indirect golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect golang.org/x/net v0.0.0-20220725212005-46097bf591d3 // indirect golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b // indirect diff --git a/test/e2e/scaffold/k8s.go b/test/e2e/scaffold/k8s.go index 1962f024ea..fbaf74d5ad 100644 --- a/test/e2e/scaffold/k8s.go +++ b/test/e2e/scaffold/k8s.go @@ -27,6 +27,7 @@ import ( "time" "github.com/apache/apisix-ingress-controller/pkg/apisix" + "github.com/apache/apisix-ingress-controller/pkg/log" "github.com/apache/apisix-ingress-controller/pkg/metrics" v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" "github.com/gruntwork-io/terratest/modules/k8s" @@ -34,6 +35,7 @@ import ( "github.com/gruntwork-io/terratest/modules/testing" ginkgo "github.com/onsi/ginkgo/v2" "github.com/stretchr/testify/assert" + "go.uber.org/zap" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" @@ -121,7 +123,16 @@ func (s *Scaffold) CreateApisixRoute(name string, rules []ApisixRouteRule) { // CreateResourceFromString creates resource from a loaded yaml string. func (s *Scaffold) CreateResourceFromString(yaml string) error { - return k8s.KubectlApplyFromStringE(s.t, s.kubectlOptions, yaml) + err := k8s.KubectlApplyFromStringE(s.t, s.kubectlOptions, yaml) + time.Sleep(5 * time.Second) + + // if the error raised, it may be a &shell.ErrWithCmdOutput, which is useless in debug + if err != nil { + log.Errorw("create resource failed", + zap.Error(err), + ) + } + return err } func (s *Scaffold) DeleteResourceFromString(yaml string) error { diff --git a/test/e2e/scaffold/scaffold.go b/test/e2e/scaffold/scaffold.go index ff72d2eb86..614595ad71 100644 --- a/test/e2e/scaffold/scaffold.go +++ b/test/e2e/scaffold/scaffold.go @@ -34,6 +34,7 @@ import ( "time" "github.com/apache/apisix-ingress-controller/pkg/config" + "github.com/apache/apisix-ingress-controller/pkg/log" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/gavv/httpexpect/v2" "github.com/gruntwork-io/terratest/modules/k8s" @@ -41,6 +42,7 @@ import ( "github.com/gruntwork-io/terratest/modules/testing" ginkgo "github.com/onsi/ginkgo/v2" "github.com/stretchr/testify/assert" + "go.uber.org/zap" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" @@ -640,6 +642,18 @@ func ApisixResourceVersion() *apisixResourceVersionInfo { return apisixResourceVersion } +func (s *Scaffold) DeleteResource(resourceType, name string) error { + err := k8s.RunKubectlE(s.t, s.kubectlOptions, "delete", resourceType, name) + if err != nil { + log.Errorw("delete resource failed", + zap.Error(err), + zap.String("resource", resourceType), + zap.String("name", name), + ) + } + return err +} + func (s *Scaffold) NamespaceSelectorLabelStrings() []string { var labels []string for k, v := range s.opts.NamespaceSelectorLabel { diff --git a/test/e2e/suite-features/external-service.go b/test/e2e/suite-features/external-service.go new file mode 100644 index 0000000000..03fbc24aa9 --- /dev/null +++ b/test/e2e/suite-features/external-service.go @@ -0,0 +1,582 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package features + +import ( + "fmt" + "net/http" + "reflect" + "time" + + "github.com/apache/apisix-ingress-controller/pkg/id" + v2 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2" + "github.com/apache/apisix-ingress-controller/pkg/log" + "github.com/apache/apisix-ingress-controller/pkg/providers/translation" + "github.com/apache/apisix-ingress-controller/pkg/types" + apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" + "github.com/onsi/ginkgo/v2" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + + "github.com/apache/apisix-ingress-controller/test/e2e/scaffold" +) + +var _ = ginkgo.Describe("suite-features: external services", func() { + PhaseCreateExternalService := func(s *scaffold.Scaffold, name, externalName string) { + extService := fmt.Sprintf(` +apiVersion: v1 +kind: Service +metadata: + name: %s +spec: + type: ExternalName + externalName: %s +`, name, externalName) + assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(extService)) + } + PhaseCreateApisixRoute := func(s *scaffold.Scaffold, name, upstream string) { + ar := fmt.Sprintf(` +apiVersion: apisix.apache.org/v2 +kind: ApisixRoute +metadata: + name: %s +spec: + http: + - name: rule1 + match: + hosts: + - httpbin.org + paths: + - /* + exprs: + - subject: + scope: Header + name: X-Foo + op: Equal + value: bar + upstreams: + - name: %s +`, name, upstream) + assert.Nil(ginkgo.GinkgoT(), s.CreateVersionedApisixResource(ar)) + } + + PhaseCreateApisixRouteWithHostRewrite := func(s *scaffold.Scaffold, name, upstream, rewriteHost string) { + ar := fmt.Sprintf(` +apiVersion: apisix.apache.org/v2 +kind: ApisixRoute +metadata: + name: %s +spec: + http: + - name: rule1 + match: + hosts: + - httpbin.org + paths: + - /* + exprs: + - subject: + scope: Header + name: X-Foo + op: Equal + value: bar + upstreams: + - name: %s + plugins: + - name: proxy-rewrite + enable: true + config: + host: %s +`, name, upstream, rewriteHost) + assert.Nil(ginkgo.GinkgoT(), s.CreateVersionedApisixResource(ar)) + } + + PhaseCreateApisixUpstream := func(s *scaffold.Scaffold, name string, nodeType v2.ApisixUpstreamExternalType, nodeName string) { + au := fmt.Sprintf(` +apiVersion: apisix.apache.org/v2 +kind: ApisixUpstream +metadata: + name: %s +spec: + externalNodes: + - type: %s + name: %s +`, name, nodeType, nodeName) + assert.Nil(ginkgo.GinkgoT(), s.CreateVersionedApisixResource(au)) + } + + PhaseValidateNoUpstreams := func(s *scaffold.Scaffold) { + ups, err := s.ListApisixUpstreams() + assert.Nil(ginkgo.GinkgoT(), err) + assert.Len(ginkgo.GinkgoT(), ups, 0, "upstream count") + } + + PhaseValidateNoRoutes := func(s *scaffold.Scaffold) { + routes, err := s.ListApisixRoutes() + assert.Nil(ginkgo.GinkgoT(), err) + assert.Len(ginkgo.GinkgoT(), routes, 0, "route count") + } + + PhaseValidateFirstUpstream := func(s *scaffold.Scaffold, length int, node string, port, weight int) string { + ups, err := s.ListApisixUpstreams() + assert.Nil(ginkgo.GinkgoT(), err) + assert.Len(ginkgo.GinkgoT(), ups, length, "upstream count") + upstream := ups[0] + assert.Len(ginkgo.GinkgoT(), upstream.Nodes, 1) + assert.Equal(ginkgo.GinkgoT(), node, upstream.Nodes[0].Host) + assert.Equal(ginkgo.GinkgoT(), port, upstream.Nodes[0].Port) + assert.Equal(ginkgo.GinkgoT(), weight, upstream.Nodes[0].Weight) + + return upstream.ID + } + + PhaseValidateRouteAccess := func(s *scaffold.Scaffold, upstreamId string) { + routes, err := s.ListApisixRoutes() + assert.Nil(ginkgo.GinkgoT(), err) + assert.Len(ginkgo.GinkgoT(), routes, 1, "route count") + assert.Equal(ginkgo.GinkgoT(), upstreamId, routes[0].UpstreamId) + + _ = s.NewAPISIXClient().GET("/ip"). + WithHeader("Host", "httpbin.org"). + WithHeader("X-Foo", "bar"). + Expect(). + Status(http.StatusOK) + } + + PhaseValidateRouteAccessCode := func(s *scaffold.Scaffold, upstreamId string, code int) { + routes, err := s.ListApisixRoutes() + assert.Nil(ginkgo.GinkgoT(), err) + assert.Len(ginkgo.GinkgoT(), routes, 1, "route count") + assert.Equal(ginkgo.GinkgoT(), upstreamId, routes[0].UpstreamId) + + _ = s.NewAPISIXClient().GET("/ip"). + WithHeader("Host", "httpbin.org"). + WithHeader("X-Foo", "bar"). + Expect(). + Status(code) + } + + PhaseCreateHttpbin := func(s *scaffold.Scaffold, name string) string { + _httpbinDeploymentTemplate := fmt.Sprintf(` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: %s +spec: + replicas: 1 + selector: + matchLabels: + app: %s + strategy: + rollingUpdate: + maxSurge: 50%% + maxUnavailable: 1 + type: RollingUpdate + template: + metadata: + labels: + app: %s + spec: + terminationGracePeriodSeconds: 0 + containers: + - livenessProbe: + failureThreshold: 3 + initialDelaySeconds: 2 + periodSeconds: 5 + successThreshold: 1 + tcpSocket: + port: 80 + timeoutSeconds: 2 + readinessProbe: + failureThreshold: 3 + initialDelaySeconds: 2 + periodSeconds: 5 + successThreshold: 1 + tcpSocket: + port: 80 + timeoutSeconds: 2 + image: "localhost:5000/kennethreitz/httpbin:dev" + imagePullPolicy: IfNotPresent + name: httpbin + ports: + - containerPort: 80 + name: "http" + protocol: "TCP" +`, name, name, name) + _httpService := fmt.Sprintf(` +apiVersion: v1 +kind: Service +metadata: + name: %s +spec: + selector: + app: %s + ports: + - name: http + port: 80 + protocol: TCP + targetPort: 80 + type: ClusterIP +`, name, name) + + err := s.CreateResourceFromString(s.FormatRegistry(_httpbinDeploymentTemplate)) + assert.Nil(ginkgo.GinkgoT(), err, "create temp httpbin deployment") + assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(_httpService), "create temp httpbin service") + + return fmt.Sprintf("httpbin-temp.%s.svc.cluster.local", s.Namespace()) + } + + // Cases: + // --- Basic Function --- + // 1. ApisixRoute refers to ApisixUpstream, ApisixUpstream refers to third-party service + // 2. ApisixRoute refers to ApisixUpstream, ApisixUpstream refers to ExternalName service + // 3. ApisixRoute refers to ApisixUpstream, ApisixUpstream refers to multiple third-party or ExternalName services + // 4. ApisixRoute refers to ApisixUpstream and Backends, ApisixUpstream refers to ExternalName service + // --- Update Cases --- + // o 1. ApisixRoute refers to ApisixUpstream, but the ApisixUpstream is created later + // o 2. ApisixRoute refers to ApisixUpstream, but the ExternalName service is created later + // o 3. ApisixRoute refers to ApisixUpstream, but the ApisixUpstream is updated and change to another ExternalName service + // o 4. ApisixRoute refers to ApisixUpstream, the ApisixUpstream doesn't change, but the ExternalName service itself is updated + // --- Delete Cases --- + // 1. ApisixRoute is deleted, the generated resources should be removed + + s := scaffold.NewDefaultV2Scaffold() + + ginkgo.Describe("basic function: ", func() { + ginkgo.It("should be able to access third-party service", func() { + // -- Data preparation -- + PhaseCreateApisixUpstream(s, "httpbin-upstream", v2.ExternalTypeDomain, "httpbin.org") + PhaseCreateApisixRoute(s, "httpbin-route", "httpbin-upstream") + + // -- validation -- + upstreamId := PhaseValidateFirstUpstream(s, 1, "httpbin.org", 80, translation.DefaultWeight) + PhaseValidateRouteAccess(s, upstreamId) + }) + ginkgo.It("should be able to access third-party service with plugins", func() { + // -- Data preparation -- + PhaseCreateApisixUpstream(s, "httpbin-upstream", v2.ExternalTypeDomain, "httpbun.org") + PhaseCreateApisixRoute(s, "httpbin-route", "httpbin-upstream") + + // -- Expect failed -- + upstreamId := PhaseValidateFirstUpstream(s, 1, "httpbun.org", 80, translation.DefaultWeight) + PhaseValidateRouteAccessCode(s, upstreamId, http.StatusBadGateway) + + // -- update -- + PhaseCreateApisixRouteWithHostRewrite(s, "httpbin-route", "httpbin-upstream", "httpbun.org") + + // -- validation -- + upstreamId = PhaseValidateFirstUpstream(s, 1, "httpbun.org", 80, translation.DefaultWeight) + PhaseValidateRouteAccess(s, upstreamId) + }) + ginkgo.It("should be able to access external domain ExternalName service", func() { + // -- Data preparation -- + PhaseCreateExternalService(s, "ext-httpbin", "httpbin.org") + PhaseCreateApisixUpstream(s, "httpbin-upstream", v2.ExternalTypeService, "ext-httpbin") + PhaseCreateApisixRoute(s, "httpbin-route", "httpbin-upstream") + + // -- validation -- + upstreamId := PhaseValidateFirstUpstream(s, 1, "httpbin.org", 80, translation.DefaultWeight) + PhaseValidateRouteAccess(s, upstreamId) + }) + ginkgo.It("should be able to access in-cluster ExternalName service", func() { + // -- Data preparation -- + fqdn := PhaseCreateHttpbin(s, "httpbin-temp") + + // We are only testing the functionality of the external service and do not care which namespace the service is in. + // The namespace of the external service should be watched. + PhaseCreateExternalService(s, "ext-httpbin", fqdn) + PhaseCreateApisixUpstream(s, "httpbin-upstream", v2.ExternalTypeService, "ext-httpbin") + PhaseCreateApisixRoute(s, "httpbin-route", "httpbin-upstream") + + // -- validation -- + upstreamId := PhaseValidateFirstUpstream(s, 1, fqdn, 80, translation.DefaultWeight) + PhaseValidateRouteAccess(s, upstreamId) + }) + }) + ginkgo.Describe("complex usage: ", func() { + PhaseCreateApisixUpstreamWithMultipleExternalNodes := func(s *scaffold.Scaffold, name string, + nodeTypeA v2.ApisixUpstreamExternalType, nodeNameA string, nodeTypeB v2.ApisixUpstreamExternalType, nodeNameB string) { + au := fmt.Sprintf(` +apiVersion: apisix.apache.org/v2 +kind: ApisixUpstream +metadata: + name: %s +spec: + externalNodes: + - type: %s + name: %s + - type: %s + name: %s +`, name, nodeTypeA, nodeNameA, nodeTypeB, nodeNameB) + assert.Nil(ginkgo.GinkgoT(), s.CreateVersionedApisixResource(au)) + } + + PhaseCreateApisixRouteWithHostRewriteAndBackend := func(s *scaffold.Scaffold, name, upstream, hostRewrite, serviceName string, servicePort int) { + ar := fmt.Sprintf(` +apiVersion: apisix.apache.org/v2 +kind: ApisixRoute +metadata: + name: %s +spec: + http: + - name: rule1 + match: + hosts: + - httpbin.org + paths: + - /* + exprs: + - subject: + scope: Header + name: X-Foo + op: Equal + value: bar + upstreams: + - name: %s + backends: + - serviceName: %s + servicePort: %d + resolveGranularity: service + plugins: + - name: proxy-rewrite + enable: true + config: + host: %s +`, name, upstream, serviceName, servicePort, hostRewrite) + + assert.Nil(ginkgo.GinkgoT(), s.CreateVersionedApisixResource(ar)) + } + + validateHttpbinAndHttpbunAreAccessed := func() { + hasPoweredBy := false // httpbun.org + hasNoPoweredBy := false // httpbin.org + for i := 0; i < 20; i++ { + headers := s.NewAPISIXClient().GET("/ip"). + WithHeader("Host", "httpbin.org"). + WithHeader("X-Foo", "bar"). + Expect(). + Status(http.StatusOK). + Headers().Raw() + if val, ok := headers["X-Powered-By"]; ok { + switch value := val.(type) { + case []interface{}: + forloop: + for _, header := range value { + switch vv := header.(type) { + case string: + if vv == "httpbun" { + hasPoweredBy = true + break forloop + } + default: + log.Errorw("type", zap.Any("type", reflect.TypeOf(val))) + } + } + default: + log.Errorw("type", zap.Any("type", reflect.TypeOf(val))) + } + } else { + hasNoPoweredBy = true + } + if hasPoweredBy && hasNoPoweredBy { + break + } + } + + assert.True(ginkgo.GinkgoT(), hasPoweredBy && hasNoPoweredBy, "both httpbin and httpbun should be accessed at least once") + } + + type validateFactor struct { + port int + weight int + } + // Note: expected nodes has unique host + PhaseValidateMultipleNodes := func(s *scaffold.Scaffold, length int, nodes map[string]*validateFactor) { + ups, err := s.ListApisixUpstreams() + assert.Nil(ginkgo.GinkgoT(), err) + assert.Len(ginkgo.GinkgoT(), ups, 1, "upstream count") + + upstream := ups[0] + assert.Len(ginkgo.GinkgoT(), upstream.Nodes, length) + for _, node := range upstream.Nodes { + host := node.Host + if factor, ok := nodes[host]; ok { + assert.Equal(ginkgo.GinkgoT(), factor.port, node.Port) + assert.Equal(ginkgo.GinkgoT(), factor.weight, node.Weight) + } else { + err := fmt.Errorf("host %s appear but it shouldn't", host) + assert.Nil(ginkgo.GinkgoT(), err) + } + } + + routes, err := s.ListApisixRoutes() + assert.Nil(ginkgo.GinkgoT(), err) + assert.Len(ginkgo.GinkgoT(), routes, 1, "route count") + assert.Equal(ginkgo.GinkgoT(), ups[0].ID, routes[0].UpstreamId) + + validateHttpbinAndHttpbunAreAccessed() + } + + // Note: expected nodes has unique host + PhaseValidateTrafficSplit := func(s *scaffold.Scaffold, length int, upstreamId string, nodes map[string]*validateFactor) { + ups, err := s.ListApisixUpstreams() + assert.Nil(ginkgo.GinkgoT(), err) + assert.Len(ginkgo.GinkgoT(), ups, length, "upstream count") + + for _, upstream := range ups { + assert.Len(ginkgo.GinkgoT(), upstream.Nodes, 1) + host := upstream.Nodes[0].Host + if factor, ok := nodes[host]; ok { + assert.Equal(ginkgo.GinkgoT(), factor.port, upstream.Nodes[0].Port) + assert.Equal(ginkgo.GinkgoT(), factor.weight, upstream.Nodes[0].Weight) + } else { + err := fmt.Errorf("host %s appear but it shouldn't", host) + assert.Nil(ginkgo.GinkgoT(), err) + } + } + + routes, err := s.ListApisixRoutes() + assert.Nil(ginkgo.GinkgoT(), err) + assert.Len(ginkgo.GinkgoT(), routes, 1, "route count") + assert.Equal(ginkgo.GinkgoT(), upstreamId, routes[0].UpstreamId) + + validateHttpbinAndHttpbunAreAccessed() + } + + ginkgo.It("should be able to access multiple external services", func() { + // -- Data preparation -- + PhaseCreateApisixUpstreamWithMultipleExternalNodes(s, "httpbin-upstream", + v2.ExternalTypeDomain, "httpbin.org", v2.ExternalTypeDomain, "httpbun.org") + PhaseCreateApisixRouteWithHostRewrite(s, "httpbin-route", "httpbin-upstream", "httpbun.org") + + // -- validation -- + PhaseValidateMultipleNodes(s, 2, map[string]*validateFactor{ + "httpbin.org": { + port: 80, + weight: translation.DefaultWeight, + }, + "httpbun.org": { + port: 80, + weight: translation.DefaultWeight, + }, + }) + }) + ginkgo.It("should be able to use backends and upstreams together", func() { + // -- Data preparation -- + PhaseCreateHttpbin(s, "httpbin-temp") + PhaseCreateApisixUpstream(s, "httpbin-upstream", v2.ExternalTypeDomain, "httpbun.org") + PhaseCreateApisixRouteWithHostRewriteAndBackend(s, "httpbin-route", "httpbin-upstream", "httpbun.org", "httpbin-temp", 80) + + svc, err := s.GetServiceByName("httpbin-temp") + assert.Nil(ginkgo.GinkgoT(), err, "get httpbin service") + ip := svc.Spec.ClusterIP + + upName := apisixv1.ComposeUpstreamName(s.Namespace(), "httpbin-temp", "", 80, types.ResolveGranularity.Service) + upID := id.GenID(upName) + + // -- validation -- + PhaseValidateTrafficSplit(s, 2, upID, map[string]*validateFactor{ + ip: { + port: 80, + weight: translation.DefaultWeight, + }, + "httpbun.org": { + port: 80, + weight: translation.DefaultWeight, + }, + }) + }) + }) + ginkgo.Describe("update function: ", func() { + ginkgo.It("should be able to create the ApisixUpstream later", func() { + // -- Data preparation -- + PhaseCreateApisixRoute(s, "httpbin-route", "httpbin-upstream") + PhaseValidateNoUpstreams(s) + + // -- Data Update -- + PhaseCreateApisixUpstream(s, "httpbin-upstream", v2.ExternalTypeDomain, "httpbin.org") + + // -- validation -- + upstreamId := PhaseValidateFirstUpstream(s, 1, "httpbin.org", 80, translation.DefaultWeight) + PhaseValidateRouteAccess(s, upstreamId) + }) + ginkgo.It("should be able to create the ExternalName service later", func() { + // -- Data preparation -- + fqdn := PhaseCreateHttpbin(s, "httpbin-temp") + PhaseCreateApisixUpstream(s, "httpbin-upstream", v2.ExternalTypeService, "ext-httpbin") + PhaseCreateApisixRoute(s, "httpbin-route", "httpbin-upstream") + PhaseValidateNoUpstreams(s) + + // -- Data update -- + PhaseCreateExternalService(s, "ext-httpbin", fqdn) + + // -- validation -- + upstreamId := PhaseValidateFirstUpstream(s, 1, fqdn, 80, translation.DefaultWeight) + PhaseValidateRouteAccess(s, upstreamId) + }) + ginkgo.It("should be able to update the ApisixUpstream later", func() { + // -- Data preparation -- + fqdn := PhaseCreateHttpbin(s, "httpbin-temp") + PhaseCreateExternalService(s, "ext-httpbin", fqdn) + PhaseCreateApisixUpstream(s, "httpbin-upstream", v2.ExternalTypeService, "doesnt-exist") + PhaseCreateApisixRoute(s, "httpbin-route", "httpbin-upstream") + PhaseValidateNoUpstreams(s) + + // -- Data update -- + PhaseCreateApisixUpstream(s, "httpbin-upstream", v2.ExternalTypeService, "ext-httpbin") + + // -- validation -- + upstreamId := PhaseValidateFirstUpstream(s, 1, fqdn, 80, translation.DefaultWeight) + PhaseValidateRouteAccess(s, upstreamId) + }) + ginkgo.It("should be able to update the ExternalName service later", func() { + // -- Data preparation -- + PhaseCreateExternalService(s, "ext-httpbin", "unknown.org") + PhaseCreateApisixUpstream(s, "httpbin-upstream", v2.ExternalTypeService, "ext-httpbin") + PhaseCreateApisixRoute(s, "httpbin-route", "httpbin-upstream") + PhaseValidateFirstUpstream(s, 1, "unknown.org", 80, translation.DefaultWeight) + + // -- Data update -- + PhaseCreateExternalService(s, "ext-httpbin", "httpbin.org") + + // -- validation -- + upstreamId := PhaseValidateFirstUpstream(s, 1, "httpbin.org", 80, translation.DefaultWeight) + PhaseValidateRouteAccess(s, upstreamId) + }) + }) + ginkgo.Describe("delete function: ", func() { + ginkgo.It("should be able to delete resources", func() { + // -- Data preparation -- + PhaseCreateApisixUpstream(s, "httpbin-upstream", v2.ExternalTypeDomain, "httpbin.org") + PhaseCreateApisixRoute(s, "httpbin-route", "httpbin-upstream") + + // -- validation -- + upstreamId := PhaseValidateFirstUpstream(s, 1, "httpbin.org", 80, translation.DefaultWeight) + PhaseValidateRouteAccess(s, upstreamId) + + // -- delete -- + assert.Nil(ginkgo.GinkgoT(), s.DeleteResource("ar", "httpbin-route"), "delete route") + assert.Nil(ginkgo.GinkgoT(), s.DeleteResource("au", "httpbin-upstream"), "delete upstream") + time.Sleep(time.Second * 15) + + // -- validate -- + PhaseValidateNoRoutes(s) + PhaseValidateNoUpstreams(s) + }) + }) +})