From 1c17b41249361444b5b10f4a8897f62484b545b0 Mon Sep 17 00:00:00 2001 From: kv Date: Sat, 10 Jul 2021 21:20:36 +0800 Subject: [PATCH] feat: add logic for ApisixRoute v2beta1 (#576) --- pkg/config/config.go | 2 + pkg/ingress/apisix_route.go | 61 +++-- pkg/ingress/controller.go | 10 +- pkg/ingress/pod.go | 2 +- pkg/kube/apisix/apis/config/v2beta1/types.go | 51 +--- .../config/v2beta1/zz_generated.deepcopy.go | 50 +--- pkg/kube/apisix_route.go | 23 +- pkg/kube/translation/apisix_route.go | 231 +++++++++++++++++- pkg/kube/translation/plugin.go | 6 +- pkg/kube/translation/plugin_test.go | 10 +- pkg/kube/translation/translator.go | 7 + pkg/kube/translation/util.go | 63 ++++- samples/deploy/crd/v1beta1/ApisixRoute.yaml | 4 +- test/e2e/ingress/stream.go | 108 ++++++++ test/e2e/scaffold/apisix.go | 4 + test/e2e/scaffold/ingress.go | 2 +- test/e2e/scaffold/k8s.go | 11 + test/e2e/scaffold/scaffold.go | 18 ++ test/e2e/testdata/apisix-gw-config.yaml | 2 + 19 files changed, 535 insertions(+), 130 deletions(-) create mode 100644 test/e2e/ingress/stream.go diff --git a/pkg/config/config.go b/pkg/config/config.go index 8944607e7f..c7c7d2424e 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -50,6 +50,8 @@ const ( ApisixRouteV1 = "apisix.apache.org/v1" // ApisixRouteV2alpha1 represents apisixroute.apisix.apache.org/v2alpha1 ApisixRouteV2alpha1 = "apisix.apache.org/v2alpha1" + // ApisixRouteV2beta1 represents apisixroute.apisix.apache.org/v2beta1 + ApisixRouteV2beta1 = "apisix.apache.org/v2beta1" _minimalResyncInterval = 30 * time.Second ) diff --git a/pkg/ingress/apisix_route.go b/pkg/ingress/apisix_route.go index 5a1a6c36e5..de99a5d569 100644 --- a/pkg/ingress/apisix_route.go +++ b/pkg/ingress/apisix_route.go @@ -93,12 +93,14 @@ func (c *apisixRouteController) sync(ctx context.Context, ev *types.Event) error ar kube.ApisixRoute tctx *translation.TranslateContext ) - if obj.GroupVersion == kube.ApisixRouteV1 { + switch obj.GroupVersion { + case kube.ApisixRouteV1: ar, err = c.controller.apisixRouteLister.V1(namespace, name) - } else { + case kube.ApisixRouteV2alpha1: ar, err = c.controller.apisixRouteLister.V2alpha1(namespace, name) + case kube.ApisixRouteV2beta1: + ar, err = c.controller.apisixRouteLister.V2beta1(namespace, name) } - if err != nil { if !k8serrors.IsNotFound(err) { log.Errorw("failed to get ApisixRoute", @@ -129,7 +131,9 @@ func (c *apisixRouteController) sync(ctx context.Context, ev *types.Event) error } ar = ev.Tombstone.(kube.ApisixRoute) } - if obj.GroupVersion == kube.ApisixRouteV1 { + // + switch obj.GroupVersion { + case kube.ApisixRouteV1: tctx, err = c.controller.translator.TranslateRouteV1(ar.V1()) if err != nil { log.Errorw("failed to translate ApisixRoute v1", @@ -138,7 +142,7 @@ func (c *apisixRouteController) sync(ctx context.Context, ev *types.Event) error ) return err } - } else { + case kube.ApisixRouteV2alpha1: if ev.Type != types.EventDelete { tctx, err = c.controller.translator.TranslateRouteV2alpha1(ar.V2alpha1()) } else { @@ -154,6 +158,19 @@ func (c *apisixRouteController) sync(ctx context.Context, ev *types.Event) error ) return err } + case kube.ApisixRouteV2beta1: + if ev.Type != types.EventDelete { + tctx, err = c.controller.translator.TranslateRouteV2beta1(ar.V2beta1()) + } else { + tctx, err = c.controller.translator.TranslateRouteV2beta1NotStrictly(ar.V2beta1()) + } + if err != nil { + log.Errorw("failed to translate ApisixRoute v2beta1", + zap.Error(err), + zap.Any("object", ar), + ) + return err + } } log.Debugw("translated ApisixRoute", @@ -180,18 +197,21 @@ func (c *apisixRouteController) sync(ctx context.Context, ev *types.Event) error added = m } else { var oldCtx *translation.TranslateContext - if obj.GroupVersion == kube.ApisixRouteV1 { + switch obj.GroupVersion { + case kube.ApisixRouteV1: oldCtx, err = c.controller.translator.TranslateRouteV1(obj.OldObject.V1()) - } else { + case kube.ApisixRouteV2alpha1: oldCtx, err = c.controller.translator.TranslateRouteV2alpha1(obj.OldObject.V2alpha1()) + case kube.ApisixRouteV2beta1: + oldCtx, err = c.controller.translator.TranslateRouteV2beta1(obj.OldObject.V2beta1()) } if err != nil { - log.Errorw("failed to translate old ApisixRoute v2alpha1", + log.Errorw("failed to translate old ApisixRoute", + zap.String("version", obj.GroupVersion), zap.String("event", "update"), zap.Error(err), zap.Any("ApisixRoute", ar), ) - return err } @@ -215,19 +235,26 @@ func (c *apisixRouteController) handleSyncErr(obj interface{}, errOrigin error) return } var ar kube.ApisixRoute - if event.GroupVersion == kube.ApisixRouteV1 { + switch event.GroupVersion { + case kube.ApisixRouteV1: ar, errLocal = c.controller.apisixRouteLister.V1(namespace, name) - } else { + case kube.ApisixRouteV2alpha1: ar, errLocal = c.controller.apisixRouteLister.V2alpha1(namespace, name) + case kube.ApisixRouteV2beta1: + ar, errLocal = c.controller.apisixRouteLister.V2beta1(namespace, name) } if errOrigin == nil { if ev.Type != types.EventDelete { if errLocal == nil { - if ar.GroupVersion() == kube.ApisixRouteV1 { + switch ar.GroupVersion() { + case kube.ApisixRouteV1: c.controller.recorderEvent(ar.V1(), v1.EventTypeNormal, _resourceSynced, nil) - } else if ar.GroupVersion() == kube.ApisixRouteV2alpha1 { + case kube.ApisixRouteV2alpha1: c.controller.recorderEvent(ar.V2alpha1(), v1.EventTypeNormal, _resourceSynced, nil) c.controller.recordStatus(ar.V2alpha1(), _resourceSynced, nil, metav1.ConditionTrue) + case kube.ApisixRouteV2beta1: + c.controller.recorderEvent(ar.V2beta1(), v1.EventTypeNormal, _resourceSynced, nil) + c.controller.recordStatus(ar.V2beta1(), _resourceSynced, nil, metav1.ConditionTrue) } } else { log.Errorw("failed list ApisixRoute", @@ -245,11 +272,15 @@ func (c *apisixRouteController) handleSyncErr(obj interface{}, errOrigin error) zap.Error(errOrigin), ) if errLocal == nil { - if ar.GroupVersion() == kube.ApisixRouteV1 { + switch ar.GroupVersion() { + case kube.ApisixRouteV1: c.controller.recorderEvent(ar.V1(), v1.EventTypeWarning, _resourceSyncAborted, errOrigin) - } else if ar.GroupVersion() == kube.ApisixRouteV2alpha1 { + case kube.ApisixRouteV2alpha1: c.controller.recorderEvent(ar.V2alpha1(), v1.EventTypeWarning, _resourceSyncAborted, errOrigin) c.controller.recordStatus(ar.V2alpha1(), _resourceSyncAborted, errOrigin, metav1.ConditionFalse) + case kube.ApisixRouteV2beta1: + c.controller.recorderEvent(ar.V2beta1(), v1.EventTypeWarning, _resourceSyncAborted, errOrigin) + c.controller.recordStatus(ar.V2beta1(), _resourceSyncAborted, errOrigin, metav1.ConditionFalse) } } else { log.Errorw("failed list ApisixRoute", diff --git a/pkg/ingress/controller.go b/pkg/ingress/controller.go index 0e7c6202a0..7694f0b017 100644 --- a/pkg/ingress/controller.go +++ b/pkg/ingress/controller.go @@ -194,6 +194,7 @@ func (c *Controller) initWhenStartLeading() { c.apisixRouteLister = kube.NewApisixRouteLister( apisixFactory.Apisix().V1().ApisixRoutes().Lister(), apisixFactory.Apisix().V2alpha1().ApisixRoutes().Lister(), + apisixFactory.Apisix().V2beta1().ApisixRoutes().Lister(), ) c.apisixUpstreamLister = apisixFactory.Apisix().V1().ApisixUpstreams().Lister() c.apisixTlsLister = apisixFactory.Apisix().V1().ApisixTlses().Lister() @@ -217,10 +218,13 @@ func (c *Controller) initWhenStartLeading() { } else { ingressInformer = kubeFactory.Extensions().V1beta1().Ingresses().Informer() } - if c.cfg.Kubernetes.ApisixRouteVersion == config.ApisixRouteV2alpha1 { - apisixRouteInformer = apisixFactory.Apisix().V2alpha1().ApisixRoutes().Informer() - } else { + switch c.cfg.Kubernetes.ApisixRouteVersion { + case config.ApisixRouteV1: apisixRouteInformer = apisixFactory.Apisix().V1().ApisixRoutes().Informer() + case config.ApisixRouteV2alpha1: + apisixRouteInformer = apisixFactory.Apisix().V2alpha1().ApisixRoutes().Informer() + case config.ApisixRouteV2beta1: + apisixRouteInformer = apisixFactory.Apisix().V2beta1().ApisixRoutes().Informer() } c.podInformer = kubeFactory.Core().V1().Pods().Informer() diff --git a/pkg/ingress/pod.go b/pkg/ingress/pod.go index 624ad01fd4..a5b1cd210b 100644 --- a/pkg/ingress/pod.go +++ b/pkg/ingress/pod.go @@ -65,7 +65,7 @@ func (c *podController) onAdd(obj interface{}) { return } log.Debugw("pod add event arrived", - zap.Any("object", obj), + zap.String("obj.key", key), ) pod := obj.(*corev1.Pod) if err := c.controller.podCache.Add(pod); err != nil { diff --git a/pkg/kube/apisix/apis/config/v2beta1/types.go b/pkg/kube/apisix/apis/config/v2beta1/types.go index d2d43358ab..21eb5f66bf 100644 --- a/pkg/kube/apisix/apis/config/v2beta1/types.go +++ b/pkg/kube/apisix/apis/config/v2beta1/types.go @@ -19,6 +19,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" + + "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2alpha1" ) // +genclient @@ -53,14 +55,14 @@ type ApisixRouteHTTP struct { Priority int `json:"priority,omitempty" yaml:"priority,omitempty"` Match ApisixRouteHTTPMatch `json:"match,omitempty" yaml:"match,omitempty"` // Deprecated: Backend will be removed in the future, use Backends instead. - Backend ApisixRouteHTTPBackend `json:"backend" yaml:"backend"` + Backend v2alpha1.ApisixRouteHTTPBackend `json:"backend" yaml:"backend"` // Backends represents potential backends to proxy after the route // rule matched. When number of backends are more than one, traffic-split // plugin in APISIX will be used to split traffic based on the backend weight. - Backends []ApisixRouteHTTPBackend `json:"backends" yaml:"backends"` - Websocket bool `json:"websocket" yaml:"websocket"` - Plugins []ApisixRouteHTTPPlugin `json:"plugins,omitempty" yaml:"plugins,omitempty"` - Authentication ApisixRouteAuthentication `json:"authentication,omitempty" yaml:"authentication,omitempty"` + Backends []v2alpha1.ApisixRouteHTTPBackend `json:"backends" yaml:"backends"` + Websocket bool `json:"websocket" yaml:"websocket"` + Plugins []ApisixRouteHTTPPlugin `json:"plugins,omitempty" yaml:"plugins,omitempty"` + Authentication ApisixRouteAuthentication `json:"authentication,omitempty" yaml:"authentication,omitempty"` } // ApisixRouteHTTPMatch represents the match condition for hitting this route. @@ -90,25 +92,7 @@ type ApisixRouteHTTPMatch struct { // value: // - "127.0.0.1" // - "10.0.5.11" - NginxVars []ApisixRouteHTTPMatchExpr `json:"exprs,omitempty" yaml:"exprs,omitempty"` -} - -// ApisixRouteHTTPMatchExpr represents a binary route match expression . -type ApisixRouteHTTPMatchExpr struct { - // Subject is the expression subject, it can - // be any string composed by literals and nginx - // vars. - Subject ApisixRouteHTTPMatchExprSubject `json:"subject" yaml:"subject"` - // Op is the operator. - Op string `json:"op" yaml:"op"` - // Set is an array type object of the expression. - // It should be used when the Op is "in" or "not_in"; - Set []string `json:"set" yaml:"set"` - // Value is the normal type object for the expression, - // it should be used when the Op is not "in" and "not_in". - // Set and Value are exclusive so only of them can be set - // in the same time. - Value string `json:"value" yaml:"value"` + NginxVars []v2alpha1.ApisixRouteHTTPMatchExpr `json:"exprs,omitempty" yaml:"exprs,omitempty"` } // ApisixRouteHTTPMatchExprSubject describes the route match expression subject. @@ -122,25 +106,6 @@ type ApisixRouteHTTPMatchExprSubject struct { Name string `json:"name" yaml:"name"` } -// ApisixRouteHTTPBackend represents a HTTP backend (a Kuberentes Service). -type ApisixRouteHTTPBackend struct { - // The name (short) of the service, note cross namespace is forbidden, - // so be sure the ApisixRoute and Service are in the same namespace. - ServiceName string `json:"serviceName" yaml:"serviceName"` - // The service port, could be the name or the port number. - ServicePort intstr.IntOrString `json:"servicePort" yaml:"servicePort"` - // The resolve granularity, can be "endpoints" or "service", - // when set to "endpoints", the pod ips will be used; other - // wise, the service ClusterIP or ExternalIP will be used, - // default is endpoints. - ResolveGranularity string `json:"resolveGranularity" yaml:"resolveGranularity"` - // Weight of this backend. - Weight int `json:"weight" yaml:"weight"` - // Subset specifies a subset for the target Service. The subset should be pre-defined - // in ApisixUpstream about this service. - Subset string `json:"subset" yaml:"subset"` -} - // ApisixRouteHTTPPlugin represents an APISIX plugin. type ApisixRouteHTTPPlugin struct { // The plugin name. diff --git a/pkg/kube/apisix/apis/config/v2beta1/zz_generated.deepcopy.go b/pkg/kube/apisix/apis/config/v2beta1/zz_generated.deepcopy.go index ac06d4d04f..f064c5e242 100644 --- a/pkg/kube/apisix/apis/config/v2beta1/zz_generated.deepcopy.go +++ b/pkg/kube/apisix/apis/config/v2beta1/zz_generated.deepcopy.go @@ -20,6 +20,7 @@ package v2beta1 import ( + v2alpha1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2alpha1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" ) @@ -89,11 +90,13 @@ func (in *ApisixRouteAuthenticationKeyAuth) DeepCopy() *ApisixRouteAuthenticatio func (in *ApisixRouteHTTP) DeepCopyInto(out *ApisixRouteHTTP) { *out = *in in.Match.DeepCopyInto(&out.Match) - out.Backend = in.Backend + in.Backend.DeepCopyInto(&out.Backend) if in.Backends != nil { in, out := &in.Backends, &out.Backends - *out = make([]ApisixRouteHTTPBackend, len(*in)) - copy(*out, *in) + *out = make([]v2alpha1.ApisixRouteHTTPBackend, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } } if in.Plugins != nil { in, out := &in.Plugins, &out.Plugins @@ -116,23 +119,6 @@ func (in *ApisixRouteHTTP) DeepCopy() *ApisixRouteHTTP { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *ApisixRouteHTTPBackend) DeepCopyInto(out *ApisixRouteHTTPBackend) { - *out = *in - out.ServicePort = in.ServicePort - return -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApisixRouteHTTPBackend. -func (in *ApisixRouteHTTPBackend) DeepCopy() *ApisixRouteHTTPBackend { - if in == nil { - return nil - } - out := new(ApisixRouteHTTPBackend) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ApisixRouteHTTPMatch) DeepCopyInto(out *ApisixRouteHTTPMatch) { *out = *in @@ -158,7 +144,7 @@ func (in *ApisixRouteHTTPMatch) DeepCopyInto(out *ApisixRouteHTTPMatch) { } if in.NginxVars != nil { in, out := &in.NginxVars, &out.NginxVars - *out = make([]ApisixRouteHTTPMatchExpr, len(*in)) + *out = make([]v2alpha1.ApisixRouteHTTPMatchExpr, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } @@ -176,28 +162,6 @@ func (in *ApisixRouteHTTPMatch) DeepCopy() *ApisixRouteHTTPMatch { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *ApisixRouteHTTPMatchExpr) DeepCopyInto(out *ApisixRouteHTTPMatchExpr) { - *out = *in - out.Subject = in.Subject - if in.Set != nil { - in, out := &in.Set, &out.Set - *out = make([]string, len(*in)) - copy(*out, *in) - } - return -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApisixRouteHTTPMatchExpr. -func (in *ApisixRouteHTTPMatchExpr) DeepCopy() *ApisixRouteHTTPMatchExpr { - if in == nil { - return nil - } - out := new(ApisixRouteHTTPMatchExpr) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ApisixRouteHTTPMatchExprSubject) DeepCopyInto(out *ApisixRouteHTTPMatchExprSubject) { *out = *in diff --git a/pkg/kube/apisix_route.go b/pkg/kube/apisix_route.go index b51a0d6c97..e5274c151c 100644 --- a/pkg/kube/apisix_route.go +++ b/pkg/kube/apisix_route.go @@ -22,6 +22,7 @@ import ( configv2beta1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2beta1" listersv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v1" listersv2alpha1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v2alpha1" + listersv2beta1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v2beta1" ) const ( @@ -40,6 +41,8 @@ type ApisixRouteLister interface { V1(string, string) (ApisixRoute, error) // V2alpha1 gets the ApisixRoute in apisix.apache.org/v2alpha1. V2alpha1(string, string) (ApisixRoute, error) + // V2beta1 gets the ApisixRoute in apisix.apache.org/v2beta1. + V2beta1(string, string) (ApisixRoute, error) } // ApisixRouteInformer is an encapsulation for the informer of ApisixRoute, @@ -118,6 +121,7 @@ func (ar *apisixRoute) ResourceVersion() string { type apisixRouteLister struct { v1Lister listersv1.ApisixRouteLister v2alpha1Lister listersv2alpha1.ApisixRouteLister + v2beta1Lister listersv2beta1.ApisixRouteLister } func (l *apisixRouteLister) V1(namespace, name string) (ApisixRoute, error) { @@ -142,6 +146,17 @@ func (l *apisixRouteLister) V2alpha1(namespace, name string) (ApisixRoute, error }, nil } +func (l *apisixRouteLister) V2beta1(namespace, name string) (ApisixRoute, error) { + ar, err := l.v2beta1Lister.ApisixRoutes(namespace).Get(name) + if err != nil { + return nil, err + } + return &apisixRoute{ + groupVersion: ApisixRouteV2beta1, + v2beta1: ar, + }, nil +} + // MustNewApisixRoute creates a kube.ApisixRoute object according to the // type of obj. func MustNewApisixRoute(obj interface{}) ApisixRoute { @@ -156,6 +171,11 @@ func MustNewApisixRoute(obj interface{}) ApisixRoute { groupVersion: ApisixRouteV2alpha1, v2alpha1: ar, } + case *configv2beta1.ApisixRoute: + return &apisixRoute{ + groupVersion: ApisixRouteV2beta1, + v2beta1: ar, + } default: panic("invalid ApisixRoute type") } @@ -181,9 +201,10 @@ func NewApisixRoute(obj interface{}) (ApisixRoute, error) { } } -func NewApisixRouteLister(v1 listersv1.ApisixRouteLister, v2alpha1 listersv2alpha1.ApisixRouteLister) ApisixRouteLister { +func NewApisixRouteLister(v1 listersv1.ApisixRouteLister, v2alpha1 listersv2alpha1.ApisixRouteLister, v2beta1 listersv2beta1.ApisixRouteLister) ApisixRouteLister { return &apisixRouteLister{ v1Lister: v1, v2alpha1Lister: v2alpha1, + v2beta1Lister: v2beta1, } } diff --git a/pkg/kube/translation/apisix_route.go b/pkg/kube/translation/apisix_route.go index a8ea6613dc..3cd4c765ae 100644 --- a/pkg/kube/translation/apisix_route.go +++ b/pkg/kube/translation/apisix_route.go @@ -23,6 +23,7 @@ import ( "github.com/apache/apisix-ingress-controller/pkg/id" configv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1" configv2alpha1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2alpha1" + configv2beta1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2beta1" "github.com/apache/apisix-ingress-controller/pkg/log" apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" ) @@ -133,6 +134,150 @@ func (t *translator) translateHTTPRouteNotStrictly(ctx *TranslateContext, ar *co return nil } +func (t *translator) TranslateRouteV2beta1(ar *configv2beta1.ApisixRoute) (*TranslateContext, error) { + ctx := &TranslateContext{ + upstreamMap: make(map[string]struct{}), + } + + if err := t.translateHTTPRouteV2beta1(ctx, ar); err != nil { + return nil, err + } + if err := t.translateStreamRoute(ctx, ar); err != nil { + return nil, err + } + return ctx, nil +} + +func (t *translator) TranslateRouteV2beta1NotStrictly(ar *configv2beta1.ApisixRoute) (*TranslateContext, error) { + ctx := &TranslateContext{ + upstreamMap: make(map[string]struct{}), + } + + if err := t.translateHTTPRouteV2beta1NotStrictly(ctx, ar); err != nil { + return nil, err + } + if err := t.translateStreamRouteNotStrictly(ctx, ar); err != nil { + return nil, err + } + return ctx, nil +} + +func (t *translator) translateHTTPRouteV2beta1(ctx *TranslateContext, ar *configv2beta1.ApisixRoute) error { + ruleNameMap := make(map[string]struct{}) + for _, part := range ar.Spec.HTTP { + if _, ok := ruleNameMap[part.Name]; ok { + return errors.New("duplicated route rule name") + } + ruleNameMap[part.Name] = struct{}{} + backends := part.Backends + backend := part.Backend + if len(backends) > 0 { + // Use the first backend as the default backend in Route, + // others will be configured in traffic-split plugin. + backend = backends[0] + backends = backends[1:] + } // else use the deprecated Backend. + + svcClusterIP, svcPort, err := t.getServiceClusterIPAndPort(&backend, ar.Namespace) + if err != nil { + log.Errorw("failed to get service port in backend", + zap.Any("backend", backend), + zap.Any("apisix_route", ar), + zap.Error(err), + ) + return err + } + + pluginMap := make(apisixv1.Plugins) + // add route plugins + for _, plugin := range part.Plugins { + if !plugin.Enable { + continue + } + if plugin.Config != nil { + pluginMap[plugin.Name] = plugin.Config + } else { + pluginMap[plugin.Name] = make(map[string]interface{}) + } + } + + // add KeyAuth and basicAuth plugin + if part.Authentication.Enable { + switch part.Authentication.Type { + case "keyAuth": + pluginMap["key-auth"] = part.Authentication.KeyAuth + case "basicAuth": + pluginMap["basic-auth"] = make(map[string]interface{}) + default: + pluginMap["basic-auth"] = make(map[string]interface{}) + } + } + + var exprs [][]apisixv1.StringOrSlice + if part.Match.NginxVars != nil { + exprs, err = t.translateRouteMatchExprs(part.Match.NginxVars) + if err != nil { + log.Errorw("ApisixRoute with bad nginxVars", + zap.Error(err), + zap.Any("ApisixRoute", ar), + ) + return err + } + } + if err := validateRemoteAddrs(part.Match.RemoteAddrs); err != nil { + log.Errorw("ApisixRoute with invalid remote addrs", + zap.Error(err), + zap.Strings("remote_addrs", part.Match.RemoteAddrs), + zap.Any("ApisixRoute", ar), + ) + return err + } + + upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, backend.ServiceName, backend.Subset, svcPort) + route := apisixv1.NewDefaultRoute() + route.Name = apisixv1.ComposeRouteName(ar.Namespace, ar.Name, part.Name) + route.ID = id.GenID(route.Name) + route.Priority = part.Priority + route.RemoteAddrs = part.Match.RemoteAddrs + route.Vars = exprs + route.Hosts = part.Match.Hosts + route.Uris = part.Match.Paths + route.Methods = part.Match.Methods + route.UpstreamId = id.GenID(upstreamName) + route.EnableWebsocket = part.Websocket + route.Plugins = pluginMap + + if len(backends) > 0 { + weight := _defaultWeight + if backend.Weight != nil { + weight = *backend.Weight + } + backendPoints := make([]*configv2alpha1.ApisixRouteHTTPBackend, 0) + for _, b := range backends { + backendPoints = append(backendPoints, &b) + } + plugin, err := t.translateTrafficSplitPlugin(ctx, ar.Namespace, weight, backendPoints) + if err != nil { + log.Errorw("failed to translate traffic-split plugin", + zap.Error(err), + zap.Any("ApisixRoute", ar), + ) + return err + } + route.Plugins["traffic-split"] = plugin + } + ctx.addRoute(route) + if !ctx.checkUpstreamExist(upstreamName) { + ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort) + if err != nil { + return err + } + ctx.addUpstream(ups) + } + } + return nil +} + func (t *translator) translateHTTPRoute(ctx *TranslateContext, ar *configv2alpha1.ApisixRoute) error { ruleNameMap := make(map[string]struct{}) for _, part := range ar.Spec.HTTP { @@ -149,7 +294,7 @@ func (t *translator) translateHTTPRoute(ctx *TranslateContext, ar *configv2alpha backends = backends[1:] } // else use the deprecated Backend. - svcClusterIP, svcPort, err := t.getServiceClusterIPAndPort(backend, ar) + svcClusterIP, svcPort, err := t.getServiceClusterIPAndPort(backend, ar.Namespace) if err != nil { log.Errorw("failed to get service port in backend", zap.Any("backend", backend), @@ -223,7 +368,7 @@ func (t *translator) translateHTTPRoute(ctx *TranslateContext, ar *configv2alpha if backend.Weight != nil { weight = *backend.Weight } - plugin, err := t.translateTrafficSplitPlugin(ctx, ar, weight, backends) + plugin, err := t.translateTrafficSplitPlugin(ctx, ar.Namespace, weight, backends) if err != nil { log.Errorw("failed to translate traffic-split plugin", zap.Error(err), @@ -366,6 +511,41 @@ func (t *translator) translateTCPRouteNotStrictly(ctx *TranslateContext, ar *con return nil } +func (t *translator) translateStreamRoute(ctx *TranslateContext, ar *configv2beta1.ApisixRoute) error { + ruleNameMap := make(map[string]struct{}) + for _, part := range ar.Spec.Stream { + if _, ok := ruleNameMap[part.Name]; ok { + return errors.New("duplicated route rule name") + } + ruleNameMap[part.Name] = struct{}{} + backend := part.Backend + svcClusterIP, svcPort, err := t.getStreamServiceClusterIPAndPort(backend, ar.Namespace) + if err != nil { + log.Errorw("failed to get service port in backend", + zap.Any("backend", backend), + zap.Any("apisix_route", ar), + zap.Error(err), + ) + return err + } + sr := apisixv1.NewDefaultStreamRoute() + name := apisixv1.ComposeStreamRouteName(ar.Namespace, ar.Name, part.Name) + sr.ID = id.GenID(name) + sr.ServerPort = part.Match.IngressPort + ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort) + if err != nil { + return err + } + sr.UpstreamId = ups.ID + ctx.addStreamRoute(sr) + if !ctx.checkUpstreamExist(ups.Name) { + ctx.addUpstream(ups) + } + + } + return nil +} + func (t *translator) translateTCPRoute(ctx *TranslateContext, ar *configv2alpha1.ApisixRoute) error { ruleNameMap := make(map[string]struct{}) for _, part := range ar.Spec.TCP { @@ -400,3 +580,50 @@ func (t *translator) translateTCPRoute(ctx *TranslateContext, ar *configv2alpha1 } return nil } + +// translateHTTPRouteV2beta1NotStrictly translates http route with a loose way, only generate ID and Name for delete Event. +func (t *translator) translateHTTPRouteV2beta1NotStrictly(ctx *TranslateContext, ar *configv2beta1.ApisixRoute) error { + for _, part := range ar.Spec.HTTP { + backends := part.Backends + backend := part.Backend + if len(backends) > 0 { + // Use the first backend as the default backend in Route, + // others will be configured in traffic-split plugin. + backend = backends[0] + } // else use the deprecated Backend. + upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal) + route := apisixv1.NewDefaultRoute() + route.Name = apisixv1.ComposeRouteName(ar.Namespace, ar.Name, part.Name) + route.ID = id.GenID(route.Name) + ctx.addRoute(route) + if !ctx.checkUpstreamExist(upstreamName) { + ups, err := t.translateUpstreamNotStrictly(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal) + if err != nil { + return err + } + ctx.addUpstream(ups) + } + } + return nil +} + +// translateStreamRouteNotStrictly translates tcp route with a loose way, only generate ID and Name for delete Event. +func (t *translator) translateStreamRouteNotStrictly(ctx *TranslateContext, ar *configv2beta1.ApisixRoute) error { + for _, part := range ar.Spec.Stream { + backend := &part.Backend + sr := apisixv1.NewDefaultStreamRoute() + name := apisixv1.ComposeStreamRouteName(ar.Namespace, ar.Name, part.Name) + sr.ID = id.GenID(name) + sr.ServerPort = part.Match.IngressPort + ups, err := t.translateUpstreamNotStrictly(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort.IntVal) + if err != nil { + return err + } + sr.UpstreamId = ups.ID + ctx.addStreamRoute(sr) + if !ctx.checkUpstreamExist(ups.Name) { + ctx.addUpstream(ups) + } + } + return nil +} diff --git a/pkg/kube/translation/plugin.go b/pkg/kube/translation/plugin.go index 3cc649030f..046f409e66 100644 --- a/pkg/kube/translation/plugin.go +++ b/pkg/kube/translation/plugin.go @@ -27,18 +27,18 @@ var ( _errPasswordNotFoundOrInvalid = errors.New("key \"password\" not found or invalid in secret") ) -func (t *translator) translateTrafficSplitPlugin(ctx *TranslateContext, ar *configv2alpha1.ApisixRoute, defaultBackendWeight int, +func (t *translator) translateTrafficSplitPlugin(ctx *TranslateContext, ns string, defaultBackendWeight int, backends []*configv2alpha1.ApisixRouteHTTPBackend) (*apisixv1.TrafficSplitConfig, error) { var ( wups []apisixv1.TrafficSplitConfigRuleWeightedUpstream ) for _, backend := range backends { - svcClusterIP, svcPort, err := t.getServiceClusterIPAndPort(backend, ar) + svcClusterIP, svcPort, err := t.getServiceClusterIPAndPort(backend, ns) if err != nil { return nil, err } - ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort) + ups, err := t.translateUpstream(ns, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort) if err != nil { return nil, err } diff --git a/pkg/kube/translation/plugin_test.go b/pkg/kube/translation/plugin_test.go index 534bc3cd1f..0072da6b95 100644 --- a/pkg/kube/translation/plugin_test.go +++ b/pkg/kube/translation/plugin_test.go @@ -178,7 +178,7 @@ func TestTranslateTrafficSplitPlugin(t *testing.T) { ctx := &TranslateContext{ upstreamMap: make(map[string]struct{}), } - cfg, err := tr.translateTrafficSplitPlugin(ctx, ar1, 30, backends) + cfg, err := tr.translateTrafficSplitPlugin(ctx, ar1.Namespace, 30, backends) assert.Nil(t, err) assert.Len(t, ctx.Upstreams, 2) @@ -347,7 +347,7 @@ func TestTranslateTrafficSplitPluginWithSameUpstreams(t *testing.T) { ApisixUpstreamLister: auLister, }} ctx := &TranslateContext{upstreamMap: make(map[string]struct{})} - cfg, err := tr.translateTrafficSplitPlugin(ctx, ar1, 30, backends) + cfg, err := tr.translateTrafficSplitPlugin(ctx, ar1.Namespace, 30, backends) assert.Nil(t, err) assert.Len(t, ctx.Upstreams, 1) @@ -511,7 +511,7 @@ func TestTranslateTrafficSplitPluginBadCases(t *testing.T) { ApisixUpstreamLister: auLister, }} ctx := &TranslateContext{upstreamMap: make(map[string]struct{})} - cfg, err := tr.translateTrafficSplitPlugin(ctx, ar1, 30, backends) + cfg, err := tr.translateTrafficSplitPlugin(ctx, ar1.Namespace, 30, backends) assert.Nil(t, cfg) assert.Len(t, ctx.Upstreams, 0) assert.Equal(t, err.Error(), "service \"svc-2\" not found") @@ -519,14 +519,14 @@ func TestTranslateTrafficSplitPluginBadCases(t *testing.T) { backends[0].ServiceName = "svc-1" backends[1].ServicePort.StrVal = "port-not-found" ctx = &TranslateContext{upstreamMap: make(map[string]struct{})} - cfg, err = tr.translateTrafficSplitPlugin(ctx, ar1, 30, backends) + cfg, err = tr.translateTrafficSplitPlugin(ctx, ar1.Namespace, 30, backends) assert.Nil(t, cfg) assert.Equal(t, err.Error(), "service.spec.ports: port not defined") backends[1].ServicePort.StrVal = "port2" backends[1].ResolveGranularity = "service" ctx = &TranslateContext{upstreamMap: make(map[string]struct{})} - cfg, err = tr.translateTrafficSplitPlugin(ctx, ar1, 30, backends) + cfg, err = tr.translateTrafficSplitPlugin(ctx, ar1.Namespace, 30, backends) assert.Nil(t, cfg) assert.Equal(t, err.Error(), "conflict headless service and backend resolve granularity") } diff --git a/pkg/kube/translation/translator.go b/pkg/kube/translation/translator.go index e642d43abb..c1f6ac985c 100644 --- a/pkg/kube/translation/translator.go +++ b/pkg/kube/translation/translator.go @@ -24,6 +24,7 @@ import ( "github.com/apache/apisix-ingress-controller/pkg/kube" configv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1" configv2alpha1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2alpha1" + configv2beta1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2beta1" listersv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v1" "github.com/apache/apisix-ingress-controller/pkg/types" apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" @@ -73,6 +74,12 @@ type Translator interface { // TranslateRouteV2alpha1NotStrictly translates the configv2alpha1.ApisixRoute object into several Route // and Upstream resources not strictly, only used for delete event. TranslateRouteV2alpha1NotStrictly(*configv2alpha1.ApisixRoute) (*TranslateContext, error) + // TranslateRouteV2beta1 translates the configv2beta1.ApisixRoute object into several Route + // and Upstream resources. + TranslateRouteV2beta1(*configv2beta1.ApisixRoute) (*TranslateContext, error) + // TranslateRouteV2beta1NotStrictly translates the configv2beta1.ApisixRoute object into several Route + // and Upstream resources not strictly, only used for delete event. + TranslateRouteV2beta1NotStrictly(*configv2beta1.ApisixRoute) (*TranslateContext, error) // TranslateSSL translates the configv2alpha1.ApisixTls object into the APISIX SSL resource. TranslateSSL(*configv1.ApisixTls) (*apisixv1.Ssl, error) // TranslateClusterConfig translates the configv2alpha1.ApisixClusterConfig object into the APISIX diff --git a/pkg/kube/translation/util.go b/pkg/kube/translation/util.go index c57d80932b..88ba6f90a7 100644 --- a/pkg/kube/translation/util.go +++ b/pkg/kube/translation/util.go @@ -23,6 +23,7 @@ import ( "github.com/apache/apisix-ingress-controller/pkg/id" configv2alpha1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2alpha1" + configv2beta1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2beta1" "github.com/apache/apisix-ingress-controller/pkg/log" "github.com/apache/apisix-ingress-controller/pkg/types" apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" @@ -32,12 +33,19 @@ var ( _errInvalidAddress = errors.New("address is neither IP or CIDR") ) -func (t *translator) getServiceClusterIPAndPort(backend *configv2alpha1.ApisixRouteHTTPBackend, ar *configv2alpha1.ApisixRoute) (string, int32, error) { - svc, err := t.ServiceLister.Services(ar.Namespace).Get(backend.ServiceName) +func (t *translator) getServiceClusterIPAndPort(backend *configv2alpha1.ApisixRouteHTTPBackend, ns string) (string, int32, error) { + svc, err := t.ServiceLister.Services(ns).Get(backend.ServiceName) if err != nil { return "", 0, err } svcPort := int32(-1) + if backend.ResolveGranularity == "service" && svc.Spec.ClusterIP == "" { + log.Errorw("ApisixRoute refers to a headless service but want to use the service level resolve granularity", + zap.Any("namespace", ns), + zap.Any("service", svc), + ) + return "", 0, errors.New("conflict headless service and backend resolve granularity") + } loop: for _, port := range svc.Spec.Ports { switch backend.ServicePort.Type { @@ -55,19 +63,12 @@ loop: } if svcPort == -1 { log.Errorw("ApisixRoute refers to non-existent Service port", - zap.Any("ApisixRoute", ar), + zap.String("namespace", ns), zap.String("port", backend.ServicePort.String()), ) return "", 0, err } - if backend.ResolveGranularity == "service" && svc.Spec.ClusterIP == "" { - log.Errorw("ApisixRoute refers to a headless service but want to use the service level resolve granularity", - zap.Any("ApisixRoute", ar), - zap.Any("service", svc), - ) - return "", 0, errors.New("conflict headless service and backend resolve granularity") - } return svc.Spec.ClusterIP, svcPort, nil } @@ -77,6 +78,13 @@ func (t *translator) getTCPServiceClusterIPAndPort(backend *configv2alpha1.Apisi return "", 0, err } svcPort := int32(-1) + if backend.ResolveGranularity == "service" && svc.Spec.ClusterIP == "" { + log.Errorw("ApisixRoute refers to a headless service but want to use the service level resolve granularity", + zap.Any("ApisixRoute", ar), + zap.Any("service", svc), + ) + return "", 0, errors.New("conflict headless service and backend resolve granularity") + } loop: for _, port := range svc.Spec.Ports { switch backend.ServicePort.Type { @@ -100,13 +108,46 @@ loop: return "", 0, err } + return svc.Spec.ClusterIP, svcPort, nil +} + +// getStreamServiceClusterIPAndPort is for v2beta1 streamRoute +func (t *translator) getStreamServiceClusterIPAndPort(backend configv2beta1.ApisixRouteStreamBackend, ns string) (string, int32, error) { + svc, err := t.ServiceLister.Services(ns).Get(backend.ServiceName) + if err != nil { + return "", 0, err + } + svcPort := int32(-1) if backend.ResolveGranularity == "service" && svc.Spec.ClusterIP == "" { log.Errorw("ApisixRoute refers to a headless service but want to use the service level resolve granularity", - zap.Any("ApisixRoute", ar), + zap.String("ApisixRoute namespace", ns), zap.Any("service", svc), ) return "", 0, errors.New("conflict headless service and backend resolve granularity") } +loop: + for _, port := range svc.Spec.Ports { + switch backend.ServicePort.Type { + case intstr.Int: + if backend.ServicePort.IntVal == port.Port { + svcPort = port.Port + break loop + } + case intstr.String: + if backend.ServicePort.StrVal == port.Name { + svcPort = port.Port + break loop + } + } + } + if svcPort == -1 { + log.Errorw("ApisixRoute refers to non-existent Service port", + zap.String("ApisixRoute namespace", ns), + zap.String("port", backend.ServicePort.String()), + ) + return "", 0, err + } + return svc.Spec.ClusterIP, svcPort, nil } diff --git a/samples/deploy/crd/v1beta1/ApisixRoute.yaml b/samples/deploy/crd/v1beta1/ApisixRoute.yaml index 3b8a377804..bb63b39b12 100644 --- a/samples/deploy/crd/v1beta1/ApisixRoute.yaml +++ b/samples/deploy/crd/v1beta1/ApisixRoute.yaml @@ -47,10 +47,10 @@ spec: deprecated: true - name: v2alpha1 served: true - storage: true + storage: false - name: v2beta1 served: true - storage: false + storage: true scope: Namespaced names: plural: apisixroutes diff --git a/test/e2e/ingress/stream.go b/test/e2e/ingress/stream.go new file mode 100644 index 0000000000..d57cbc4c89 --- /dev/null +++ b/test/e2e/ingress/stream.go @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ingress + +import ( + "context" + "fmt" + "time" + + "github.com/onsi/ginkgo" + "github.com/stretchr/testify/assert" + + "github.com/apache/apisix-ingress-controller/test/e2e/scaffold" +) + +var _ = ginkgo.Describe("ApisixRoute stream Testing with v2beta1", func() { + opts := &scaffold.Options{ + Name: "default", + Kubeconfig: scaffold.GetKubeconfig(), + APISIXConfigPath: "testdata/apisix-gw-config.yaml", + IngressAPISIXReplicas: 1, + HTTPBinServicePort: 80, + APISIXRouteVersion: "apisix.apache.org/v2beta1", + } + s := scaffold.NewScaffold(opts) + ginkgo.It("stream tcp proxy", func() { + backendSvc, backendSvcPort := s.DefaultHTTPBackend() + apisixRoute := fmt.Sprintf(` +apiVersion: apisix.apache.org/v2beta1 +kind: ApisixRoute +metadata: + name: httpbin-tcp-route +spec: + stream: + - name: rule1 + protocol: TCP + match: + ingressPort: 9100 + backend: + serviceName: %s + servicePort: %d +`, backendSvc, backendSvcPort[0]) + + assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(apisixRoute)) + time.Sleep(9 * time.Second) + + err := s.EnsureNumApisixStreamRoutesCreated(1) + assert.Nil(ginkgo.GinkgoT(), err, "Checking number of routes") + + sr, err := s.ListApisixStreamRoutes() + assert.Nil(ginkgo.GinkgoT(), err) + assert.Len(ginkgo.GinkgoT(), sr, 1) + assert.Equal(ginkgo.GinkgoT(), sr[0].ServerPort, int32(9100)) + + resp := s.NewAPISIXClientWithTCPProxy().GET("/ip").Expect() + resp.Body().Contains("origin") + + resp = s.NewAPISIXClientWithTCPProxy().GET("/get").WithHeader("x-my-header", "x-my-value").Expect() + resp.Body().Contains("x-my-value") + }) + ginkgo.It("stream udp proxy", func() { + apisixRoute := fmt.Sprintf(` +apiVersion: apisix.apache.org/v2beta1 +kind: ApisixRoute +metadata: + name: httpbin-udp-route +spec: + stream: + - name: rule1 + protocol: UDP + match: + ingressPort: 9200 + backend: + serviceName: kube-dns + servicePort: 53 +`) + // update namespace only for this case + s.UpdateNamespace("kube-system") + assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(apisixRoute)) + time.Sleep(9 * time.Second) + + err := s.EnsureNumApisixStreamRoutesCreated(1) + assert.Nil(ginkgo.GinkgoT(), err, "Checking number of routes") + + sr, err := s.ListApisixStreamRoutes() + assert.Nil(ginkgo.GinkgoT(), err) + assert.Len(ginkgo.GinkgoT(), sr, 1) + assert.Equal(ginkgo.GinkgoT(), sr[0].ServerPort, int32(9200)) + // test dns query + r := s.DNSResolver() + host := "httpbin.org" + _, err = r.LookupIPAddr(context.Background(), host) + assert.Nil(ginkgo.GinkgoT(), err, "dns query error") + }) +}) diff --git a/test/e2e/scaffold/apisix.go b/test/e2e/scaffold/apisix.go index 6c5f5195f8..53b01bbdd0 100644 --- a/test/e2e/scaffold/apisix.go +++ b/test/e2e/scaffold/apisix.go @@ -119,6 +119,10 @@ spec: port: 9100 protocol: TCP targetPort: 9100 + - name: udp + port: 9200 + protocol: UDP + targetPort: 9200 - name: http-control port: 9090 protocol: TCP diff --git a/test/e2e/scaffold/ingress.go b/test/e2e/scaffold/ingress.go index 2784884a14..578907a860 100644 --- a/test/e2e/scaffold/ingress.go +++ b/test/e2e/scaffold/ingress.go @@ -253,7 +253,7 @@ spec: - --default-apisix-cluster-admin-key - edd1c9f034335f136f87ad84b625c8f1 - --app-namespace - - %s + - %s,kube-system - --apisix-route-version - %s serviceAccount: ingress-apisix-e2e-test-service-account diff --git a/test/e2e/scaffold/k8s.go b/test/e2e/scaffold/k8s.go index 4517bda78c..941ec41ea9 100644 --- a/test/e2e/scaffold/k8s.go +++ b/test/e2e/scaffold/k8s.go @@ -359,11 +359,13 @@ func (s *Scaffold) newAPISIXTunnels() error { httpNodePort int httpsNodePort int tcpNodePort int + udpNodePort int controlNodePort int adminPort int httpPort int httpsPort int tcpPort int + udpPort int controlPort int ) for _, port := range s.apisixService.Spec.Ports { @@ -379,6 +381,9 @@ func (s *Scaffold) newAPISIXTunnels() error { } else if port.Name == "tcp" { tcpNodePort = int(port.NodePort) tcpPort = int(port.Port) + } else if port.Name == "udp" { + udpNodePort = int(port.NodePort) + udpPort = int(port.Port) } else if port.Name == "http-control" { controlNodePort = int(port.NodePort) controlPort = int(port.Port) @@ -393,6 +398,8 @@ func (s *Scaffold) newAPISIXTunnels() error { httpsNodePort, httpsPort) s.apisixTCPTunnel = k8s.NewTunnel(s.kubectlOptions, k8s.ResourceTypeService, "apisix-service-e2e-test", tcpNodePort, tcpPort) + s.apisixUDPTunnel = k8s.NewTunnel(s.kubectlOptions, k8s.ResourceTypeService, "apisix-service-e2e-test", + udpNodePort, udpPort) s.apisixControlTunnel = k8s.NewTunnel(s.kubectlOptions, k8s.ResourceTypeService, "apisix-service-e2e-test", controlNodePort, controlPort) @@ -412,6 +419,10 @@ func (s *Scaffold) newAPISIXTunnels() error { return err } s.addFinalizers(s.apisixTCPTunnel.Close) + if err := s.apisixUDPTunnel.ForwardPortE(s.t); err != nil { + return err + } + s.addFinalizers(s.apisixUDPTunnel.Close) if err := s.apisixControlTunnel.ForwardPortE(s.t); err != nil { return err } diff --git a/test/e2e/scaffold/scaffold.go b/test/e2e/scaffold/scaffold.go index a1c8dd24f5..7e85ec8e8a 100644 --- a/test/e2e/scaffold/scaffold.go +++ b/test/e2e/scaffold/scaffold.go @@ -20,6 +20,7 @@ import ( "crypto/x509" "fmt" "io/ioutil" + "net" "net/http" "net/url" "os" @@ -67,6 +68,7 @@ type Scaffold struct { apisixHttpTunnel *k8s.Tunnel apisixHttpsTunnel *k8s.Tunnel apisixTCPTunnel *k8s.Tunnel + apisixUDPTunnel *k8s.Tunnel apisixControlTunnel *k8s.Tunnel // Used for template rendering. @@ -205,6 +207,22 @@ func (s *Scaffold) NewAPISIXClientWithTCPProxy() *httpexpect.Expect { }) } +func (s *Scaffold) DNSResolver() *net.Resolver { + return &net.Resolver{ + PreferGo: false, + Dial: func(ctx context.Context, network, address string) (net.Conn, error) { + d := net.Dialer{ + Timeout: time.Millisecond * time.Duration(10000), + } + return d.DialContext(ctx, "udp", s.apisixUDPTunnel.Endpoint()) + }, + } +} + +func (s *Scaffold) UpdateNamespace(ns string) { + s.kubectlOptions.Namespace = ns +} + // NewAPISIXHttpsClient creates the default HTTPS client. func (s *Scaffold) NewAPISIXHttpsClient(host string) *httpexpect.Expect { u := url.URL{ diff --git a/test/e2e/testdata/apisix-gw-config.yaml b/test/e2e/testdata/apisix-gw-config.yaml index 4b75474249..3598f8e4c5 100644 --- a/test/e2e/testdata/apisix-gw-config.yaml +++ b/test/e2e/testdata/apisix-gw-config.yaml @@ -29,6 +29,8 @@ apisix: stream_proxy: # TCP/UDP proxy tcp: # TCP proxy port list - 9100 + udp: + - 9200 etcd: host: # it's possible to define multiple etcd hosts addresses of the same etcd cluster. - "http://{{ .EtcdServiceFQDN }}:2379" # multiple etcd address