Skip to content

Commit

Permalink
e2e: gateway api httproute (#1060)
Browse files Browse the repository at this point in the history
  • Loading branch information
lingsamuel authored Jun 8, 2022
1 parent 2af39c9 commit 3520830
Show file tree
Hide file tree
Showing 21 changed files with 4,043 additions and 21 deletions.
3 changes: 3 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Gateway API CRDs:
# https://github.com/kubernetes-sigs/gateway-api/tree/66a7c8ffc942a90294612f63c6dedcc73a3de5ca/config/crd/stable
samples/deploy/gateway-api/* linguist-generated=true
2 changes: 2 additions & 0 deletions .licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,6 @@ header:
- 'utils/generate-groups.sh'
- '**/*.pem'
- '**/*.key'
- 'samples/deploy/gateway-api/**'
- '.gitattributes'
comment: on-failure
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ unit-test:
.PHONY: e2e-test
e2e-test: ginkgo-check push-images
kubectl apply -k $(PWD)/samples/deploy/crd
kubectl apply -f $(PWD)/samples/deploy/gateway-api
cd test/e2e \
&& go mod download \
&& export REGISTRY=$(REGISTRY) \
Expand Down
13 changes: 8 additions & 5 deletions pkg/ingress/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,23 +665,26 @@ func (c *Controller) syncConsumer(ctx context.Context, consumer *apisixv1.Consum
}

func (c *Controller) syncEndpoint(ctx context.Context, ep kube.Endpoint) error {
namespace := ep.Namespace()
namespace, err := ep.Namespace()
if err != nil {
return err
}
svcName := ep.ServiceName()
svc, err := c.svcLister.Services(ep.Namespace()).Get(svcName)
svc, err := c.svcLister.Services(namespace).Get(svcName)
if err != nil {
if k8serrors.IsNotFound(err) {
log.Infof("service %s/%s not found", ep.Namespace(), svcName)
log.Infof("service %s/%s not found", namespace, svcName)
return nil
}
log.Errorf("failed to get service %s/%s: %s", ep.Namespace(), svcName, err)
log.Errorf("failed to get service %s/%s: %s", namespace, svcName, err)
return err
}
var subsets []configv2beta3.ApisixUpstreamSubset
subsets = append(subsets, configv2beta3.ApisixUpstreamSubset{})
au, err := c.apisixUpstreamLister.ApisixUpstreams(namespace).Get(svcName)
if err != nil {
if !k8serrors.IsNotFound(err) {
log.Errorf("failed to get ApisixUpstream %s/%s: %s", ep.Namespace(), svcName, err)
log.Errorf("failed to get ApisixUpstream %s/%s: %s", namespace, svcName, err)
return err
}
} else if au.Spec != nil && len(au.Spec.Subsets) > 0 {
Expand Down
6 changes: 5 additions & 1 deletion pkg/ingress/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,11 @@ func (c *endpointsController) run(ctx context.Context) {

func (c *endpointsController) sync(ctx context.Context, ev *types.Event) error {
ep := ev.Object.(kube.Endpoint)
newestEp, err := c.controller.epLister.GetEndpoint(ep.Namespace(), ep.ServiceName())
ns, err := ep.Namespace()
if err != nil {
return err
}
newestEp, err := c.controller.epLister.GetEndpoint(ns, ep.ServiceName())
if err != nil {
if !errors.IsNotFound(err) {
return err
Expand Down
3 changes: 3 additions & 0 deletions pkg/ingress/gateway_httproute.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ func (c *gatewayHTTPRouteController) sync(ctx context.Context, ev *types.Event)
return err
}

log.Debugw("sync HTTPRoute", zap.String("key", key))

httpRoute, err := c.controller.gatewayHttpRouteLister.HTTPRoutes(namespace).Get(name)
if err != nil {
if !k8serrors.IsNotFound(err) {
Expand Down Expand Up @@ -207,6 +209,7 @@ func (c *gatewayHTTPRouteController) onAdd(obj interface{}) {
zap.Any("object", obj),
)

log.Debugw("add HTTPRoute", zap.String("key", key))
c.workqueue.Add(&types.Event{
Type: types.EventAdd,
Object: key,
Expand Down
55 changes: 48 additions & 7 deletions pkg/kube/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
package kube

import (
"errors"

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
listerscorev1 "k8s.io/client-go/listers/core/v1"
listersdiscoveryv1 "k8s.io/client-go/listers/discovery/v1"
"k8s.io/client-go/tools/cache"

"github.com/apache/apisix-ingress-controller/pkg/log"
)

type HostPort struct {
Expand Down Expand Up @@ -53,8 +58,15 @@ func (lister *endpointLister) GetEndpoint(namespace, name string) (Endpoint, err
if err != nil {
return nil, err
}
if ep == nil {
log.Warnw("get endpoints but found nil",
zap.String("namespace", namespace),
zap.String("name", name),
)
}
return &endpoint{
endpoint: ep,
endpointType: endpointTypeEndpoints,
endpoint: ep,
}, nil
}

Expand All @@ -69,7 +81,15 @@ func (lister *endpointLister) GetEndpointSlices(namespace, svcName string) (Endp
if err != nil {
return nil, err
}
if len(eps) == 0 {
log.Warnw("get endpoint slices but found empty slice",
zap.String("namespace", namespace),
zap.String("service", svcName),
zap.Any("selector", selector),
)
}
return &endpoint{
endpointType: endpointTypeEndpointSlices,
endpointSlices: eps,
}, nil
}
Expand All @@ -79,12 +99,20 @@ type Endpoint interface {
// ServiceName returns the corresponding service owner of this endpoint.
ServiceName() string
// Namespace returns the residing namespace.
Namespace() string
Namespace() (string, error)
// Endpoints returns the corresponding endpoints which matches the ServicePort.
Endpoints(port *corev1.ServicePort) []HostPort
}

type endpointType string

const (
endpointTypeEndpoints endpointType = "Endpoint"
endpointTypeEndpointSlices endpointType = "EndpointSlices"
)

type endpoint struct {
endpointType endpointType
endpoint *corev1.Endpoints
endpointSlices []*discoveryv1.EndpointSlice
}
Expand All @@ -96,11 +124,22 @@ func (e *endpoint) ServiceName() string {
return e.endpointSlices[0].Labels[discoveryv1.LabelServiceName]
}

func (e *endpoint) Namespace() string {
if e.endpoint != nil {
return e.endpoint.Namespace
func (e *endpoint) Namespace() (string, error) {
switch e.endpointType {
case endpointTypeEndpointSlices:
if len(e.endpointSlices) > 0 {
return e.endpointSlices[0].Namespace, nil
} else {
return "", errors.New("endpoint slice is empty")
}
case endpointTypeEndpoints:
if e.endpoint != nil {
return e.endpoint.Namespace, nil
} else {
return "", errors.New("endpoint is nil")
}
}
return e.endpointSlices[0].Namespace
return "", errors.New("unknown endpoint type " + string(e.endpointType))
}

func (e *endpoint) Endpoints(svcPort *corev1.ServicePort) []HostPort {
Expand Down Expand Up @@ -171,13 +210,15 @@ func NewEndpointListerAndInformer(factory informers.SharedInformerFactory, useEn
// NewEndpoint creates an Endpoint which entity is Kubernetes Endpoints.
func NewEndpoint(ep *corev1.Endpoints) Endpoint {
return &endpoint{
endpoint: ep,
endpointType: endpointTypeEndpoints,
endpoint: ep,
}
}

// NewEndpointWithSlice creates an Endpoint which entity is Kubernetes EndpointSlices.
func NewEndpointWithSlice(ep *discoveryv1.EndpointSlice) Endpoint {
return &endpoint{
endpointType: endpointTypeEndpointSlices,
endpointSlices: []*discoveryv1.EndpointSlice{ep},
}
}
31 changes: 28 additions & 3 deletions pkg/kube/translation/gateway_httproute.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,25 +52,48 @@ func (t *translator) TranslateGatewayHTTPRouteV1Alpha2(httpRoute *gatewayv1alpha
for j, backend := range backends {
//TODO: Support filters
//filters := backend.Filters
kind := strings.ToLower(string(*backend.Kind))
var kind string
if backend.Kind == nil {
kind = "service"
} else {
kind = strings.ToLower(string(*backend.Kind))
}
if kind != "service" {
log.Warnw(fmt.Sprintf("ignore non-service kind at Rules[%v].BackendRefs[%v]", i, j),
zap.String("kind", kind),
)
continue
}

ns := string(*backend.Namespace)
var ns string
if backend.Namespace == nil {
ns = httpRoute.Namespace
} else {
ns = string(*backend.Namespace)
}
//if ns != httpRoute.Namespace {
// TODO: check gatewayv1alpha2.ReferencePolicy
//}

if backend.Port == nil {
log.Warnw(fmt.Sprintf("ignore nil port at Rules[%v].BackendRefs[%v]", i, j),
zap.String("kind", kind),
)
continue
}

ups, err := t.TranslateUpstream(ns, string(backend.Name), "", int32(*backend.Port))
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("failed to translate Rules[%v].BackendRefs[%v]", i, j))
}
name := apisixv1.ComposeUpstreamName(ns, string(backend.Name), "", int32(*backend.Port))
ups.Labels["id-name"] = name

// APISIX limits max length of label value
// https://github.com/apache/apisix/blob/5b95b85faea3094d5e466ee2d39a52f1f805abbb/apisix/schema_def.lua#L85
ups.Labels["meta_namespace"] = truncate(ns, 64)
ups.Labels["meta_backend"] = truncate(string(backend.Name), 64)
ups.Labels["meta_port"] = fmt.Sprintf("%v", int32(*backend.Port))

ups.ID = id.GenID(name)
ctx.addUpstream(ups)
ruleUpstreams = append(ruleUpstreams, ups)
Expand Down Expand Up @@ -114,6 +137,8 @@ func (t *translator) TranslateGatewayHTTPRouteV1Alpha2(httpRoute *gatewayv1alpha
return nil, errors.Wrap(err, fmt.Sprintf("failed to translate Rules[%v].Matches[%v]", i, j))
}

name := apisixv1.ComposeRouteName(httpRoute.Namespace, httpRoute.Name, fmt.Sprintf("%d-%d", i, j))
route.ID = id.GenID(name)
route.Hosts = hosts

// Bind Upstream
Expand Down
11 changes: 10 additions & 1 deletion pkg/kube/translation/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package translation
import (
"fmt"

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
listerscorev1 "k8s.io/client-go/listers/core/v1"
Expand All @@ -27,6 +28,7 @@ import (
configv2beta2 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2beta2"
configv2beta3 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2beta3"
listersv2beta3 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v2beta3"
"github.com/apache/apisix-ingress-controller/pkg/log"
"github.com/apache/apisix-ingress-controller/pkg/types"
apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)
Expand Down Expand Up @@ -230,7 +232,14 @@ func (t *translator) TranslateUpstream(namespace, name, subset string, port int3
}

func (t *translator) TranslateUpstreamNodes(endpoint kube.Endpoint, port int32, labels types.Labels) (apisixv1.UpstreamNodes, error) {
namespace := endpoint.Namespace()
namespace, err := endpoint.Namespace()
if err != nil {
log.Errorw("failed to get endpoint namespace",
zap.Error(err),
zap.Any("endpoint", endpoint),
)
return nil, err
}
svcName := endpoint.ServiceName()
svc, err := t.ServiceLister.Services(namespace).Get(svcName)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions pkg/kube/translation/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,10 @@ func validateRemoteAddrs(remoteAddrs []string) error {
}
return nil
}

func truncate(s string, max int) string {
if max > len(s) || max < 0 {
return s
}
return s[:max]
}
Loading

0 comments on commit 3520830

Please sign in to comment.