Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: traffic split #308

Merged
merged 6 commits into from
Mar 22, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: traffic split
  • Loading branch information
tokers committed Mar 19, 2021
commit d08cfecde2e4aeefdc335ed81ca7da47ff088d02
15 changes: 11 additions & 4 deletions pkg/kube/apisix/apis/config/v2alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,15 @@ type ApisixRouteHTTP struct {
// Route priority, when multiple routes contains
// same URI path (for path matching), route with
// higher priority will take effect.
Priority int `json:"priority,omitempty"`
Match *ApisixRouteHTTPMatch `json:"match,omitempty"`
Backend *ApisixRouteHTTPBackend `json:"backend"`
Plugins []*ApisixRouteHTTPPlugin `json:"plugins,omitempty"`
Priority int `json:"priority,omitempty"`
Match *ApisixRouteHTTPMatch `json:"match,omitempty"`
// Deprecated: Backend will be removed in the future, use Backends instead.
Backend *ApisixRouteHTTPBackend `json:"backend"`
// Backends represents potential backends to proxy after the route
// rule matched. When number of backends are more than one, traffic-split
// plugin in APISIX will be used to split traffic based on the backend weight.
Backends []*ApisixRouteHTTPBackend `json:"backends"`
Plugins []*ApisixRouteHTTPPlugin `json:"plugins,omitempty"`
}

// ApisixRouteHTTPMatch represents the match condition for hitting this route.
Expand Down Expand Up @@ -137,6 +142,8 @@ type ApisixRouteHTTPBackend struct {
// wise, the service ClusterIP or ExternalIP will be used,
// default is endpoints.
ResolveGranularity string `json:"resolveGranularity"`
// Weight of this backend.
Weight int `json:"weight"`
}

// ApisixRouteHTTPPlugin represents an APISIX plugin.
Expand Down
11 changes: 11 additions & 0 deletions pkg/kube/apisix/apis/config/v2alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

96 changes: 47 additions & 49 deletions pkg/kube/translation/apisix_route.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"errors"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/intstr"

"github.com/apache/apisix-ingress-controller/pkg/id"
configv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
Expand Down Expand Up @@ -59,7 +58,7 @@ func (t *translator) TranslateRouteV1(ar *configv1.ApisixRoute) ([]*apisixv1.Rou
pluginMap[plugin.Name] = make(map[string]interface{})
}
}

upsId := id.GenID(upstreamName)
route := &apisixv1.Route{
Metadata: apisixv1.Metadata{
FullName: routeName,
Expand All @@ -69,7 +68,7 @@ func (t *translator) TranslateRouteV1(ar *configv1.ApisixRoute) ([]*apisixv1.Rou
Host: r.Host,
Path: p.Path,
UpstreamName: upstreamName,
UpstreamId: id.GenID(upstreamName),
UpstreamId: upsId,
Plugins: pluginMap,
}
routes = append(routes, route)
Expand All @@ -80,6 +79,7 @@ func (t *translator) TranslateRouteV1(ar *configv1.ApisixRoute) ([]*apisixv1.Rou
return nil, nil, err
}
ups.FullName = upstreamName
ups.ID = upsId
ups.ResourceVersion = ar.ResourceVersion
ups.Name = upstreamName
upstreamMap[ups.FullName] = ups
Expand Down Expand Up @@ -108,40 +108,29 @@ func (t *translator) TranslateRouteV2alpha1(ar *configv2alpha1.ApisixRoute) ([]*
if len(part.Match.Paths) < 1 {
return nil, nil, errors.New("empty route paths match")
}
svc, err := t.ServiceLister.Services(ar.Namespace).Get(part.Backend.ServiceName)
if err != nil {
return nil, nil, err
if part.Backend != nil && len(part.Backends) > 1 {
return nil, nil, errors.New("backend and backends are exclusive")
}
svcPort := int32(-1)
loop:
for _, port := range svc.Spec.Ports {
switch part.Backend.ServicePort.Type {
case intstr.Int:
if part.Backend.ServicePort.IntVal == port.Port {
svcPort = port.Port
break loop
}
case intstr.String:
if part.Backend.ServicePort.StrVal == port.Name {
svcPort = port.Port
break loop
}
}
}
if svcPort == -1 {
log.Errorw("ApisixRoute refers to non-existent Service port",
zap.Any("ApisixRoute", ar),
zap.String("port", part.Backend.ServicePort.String()),
)
return nil, nil, err
if part.Backend == nil && len(part.Backends) == 0 {
return nil, nil, errors.New("no specified backend")
}
backends := part.Backends
backend := part.Backend
if len(backends) > 1 {
// Use the first backend as the default backend in Route,
// others will be configured in traffic-split plugin.
backend = backends[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confused here, backend will be covered if backends is not empty ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, already check the exclusive relationship before this, when backends not empty, backend is nil.

backends = backends[1:]
} // else use the deprecated Backend.

if part.Backend.ResolveGranularity == "service" && svc.Spec.ClusterIP == "" {
log.Errorw("ApisixRoute refers to a headless service but want to use the service level resolve granularity",
zap.Any("ApisixRoute", ar),
zap.Any("service", svc),
svcClusterIP, svcPort, err := t.getServiceClusterIPAndPort(backend, ar)
if err != nil {
log.Errorw("failed to get service port in backend",
zap.Any("backend", backend),
zap.Any("apisix_route", ar),
zap.Error(err),
)
return nil, nil, errors.New("conflict headless service and backend resolve granularity")
return nil, nil, err
}

pluginMap := make(apisixv1.Plugins)
Expand All @@ -168,7 +157,8 @@ func (t *translator) TranslateRouteV2alpha1(ar *configv2alpha1.ApisixRoute) ([]*
}

routeName := apisixv1.ComposeRouteName(ar.Namespace, ar.Name, part.Name)
upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, part.Backend.ServiceName, svcPort)
upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, backend.ServiceName, svcPort)
upsId := id.GenID(upstreamName)
route := &apisixv1.Route{
Metadata: apisixv1.Metadata{
FullName: routeName,
Expand All @@ -181,29 +171,37 @@ func (t *translator) TranslateRouteV2alpha1(ar *configv2alpha1.ApisixRoute) ([]*
Uris: part.Match.Paths,
Methods: part.Match.Methods,
UpstreamName: upstreamName,
UpstreamId: id.GenID(upstreamName),
UpstreamId: upsId,
Plugins: pluginMap,
}

routes = append(routes, route)

if _, ok := upstreamMap[upstreamName]; !ok {
ups, err := t.TranslateUpstream(ar.Namespace, part.Backend.ServiceName, svcPort)
if len(backends) > 0 {
weight := _defaultWeight
if backend.Weight != 0 {
weight = backend.Weight
}
ups, plugin, err := t.translateTrafficSplitPlugin(ar, weight, backends)
if err != nil {
log.Errorw("failed to translate traffic-split plugin",
zap.Error(err),
zap.Any("ApisixRoute", ar),
)
return nil, nil, err
}
if part.Backend.ResolveGranularity == "service" {
ups.Nodes = []apisixv1.UpstreamNode{
{
IP: svc.Spec.ClusterIP,
Port: int(svcPort),
Weight: _defaultWeight,
},
for _, u := range ups {
if _, ok := upstreamMap[u.FullName]; !ok {
upstreamMap[u.FullName] = u
}
}
ups.FullName = upstreamName
ups.ResourceVersion = ar.ResourceVersion
ups.Name = upstreamName
route.Plugins["traffic-split"] = plugin
}

routes = append(routes, route)
if _, ok := upstreamMap[upstreamName]; !ok {
ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.ResolveGranularity, svcClusterIP, svcPort)
if err != nil {
return nil, nil, err
}
upstreamMap[ups.FullName] = ups
}
}
Expand Down
63 changes: 63 additions & 0 deletions pkg/kube/translation/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package translation

import (
configv2alpha1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2alpha1"
apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)

func (t *translator) translateTrafficSplitPlugin(ar *configv2alpha1.ApisixRoute, defaultBackendWeight int,
backends []*configv2alpha1.ApisixRouteHTTPBackend) ([]*apisixv1.Upstream, *apisixv1.TrafficSplitConfig, error) {
var (
upstreams []*apisixv1.Upstream
wups []apisixv1.TrafficSplitConfigRuleWeightedUpstream
)

for _, backend := range backends {
svcClusterIP, svcPort, err := t.getServiceClusterIPAndPort(backend, ar)
if err != nil {
return nil, nil, err
}
ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.ResolveGranularity, svcClusterIP, svcPort)
if err != nil {
return nil, nil, err
}
upstreams = append(upstreams, ups)

weight := _defaultWeight
if backend.Weight != 0 {
weight = backend.Weight
}
wups = append(wups, apisixv1.TrafficSplitConfigRuleWeightedUpstream{
UpstreamID: ups.ID,
Weight: weight,
})
}

// Finally append the default upstream in the route.
wups = append(wups, apisixv1.TrafficSplitConfigRuleWeightedUpstream{
Weight: defaultBackendWeight,
})

tsCfg := &apisixv1.TrafficSplitConfig{
Rules: []apisixv1.TrafficSplitConfigRule{
{
WeightedUpstreams: wups,
},
},
}
return upstreams, tsCfg, nil
}
87 changes: 87 additions & 0 deletions pkg/kube/translation/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package translation

import (
"errors"

"github.com/apache/apisix-ingress-controller/pkg/id"
apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/intstr"

configv2alpha1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2alpha1"
"github.com/apache/apisix-ingress-controller/pkg/log"
)

func (t *translator) getServiceClusterIPAndPort(backend *configv2alpha1.ApisixRouteHTTPBackend, ar *configv2alpha1.ApisixRoute) (string, int32, error) {
svc, err := t.ServiceLister.Services(ar.Namespace).Get(backend.ServiceName)
if err != nil {
return "", 0, err
}
svcPort := int32(-1)
loop:
for _, port := range svc.Spec.Ports {
switch backend.ServicePort.Type {
case intstr.Int:
if backend.ServicePort.IntVal == port.Port {
svcPort = port.Port
break loop
}
case intstr.String:
if backend.ServicePort.StrVal == port.Name {
svcPort = port.Port
break loop
}
}
}
if svcPort == -1 {
log.Errorw("ApisixRoute refers to non-existent Service port",
zap.Any("ApisixRoute", ar),
zap.String("port", backend.ServicePort.String()),
)
return "", 0, err
}

if backend.ResolveGranularity == "service" && svc.Spec.ClusterIP == "" {
log.Errorw("ApisixRoute refers to a headless service but want to use the service level resolve granularity",
zap.Any("ApisixRoute", ar),
zap.Any("service", svc),
)
return "", 0, errors.New("conflict headless service and backend resolve granularity")
}
return svc.Spec.ClusterIP, svcPort, nil
}

func (t *translator) translateUpstream(namespace, svcName, svcResolveGranularity, svcClusterIP string, svcPort int32) (*apisixv1.Upstream, error) {
ups, err := t.TranslateUpstream(namespace, svcName, svcPort)
if err != nil {
return nil, err
}
if svcResolveGranularity == "service" {
ups.Nodes = []apisixv1.UpstreamNode{
{
IP: svcClusterIP,
Port: int(svcPort),
Weight: _defaultWeight,
},
}
}
ups.FullName = apisixv1.ComposeUpstreamName(namespace, svcName, svcPort)
ups.Name = ups.FullName
ups.ID = id.GenID(ups.FullName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If upstream.ID is nil, what is the behavior of the controller ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update of Upstream will fail in APISIX side since the necessary ID is not provided by the caller.

return ups, nil
}
18 changes: 18 additions & 0 deletions pkg/types/apisix/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,24 @@ type Ssl struct {
Group string `json:"group,omitempty" yaml:"group,omitempty"`
}

// TrafficSplitConfig is the config of traffic-split plugin.
// +k8s:deepcopy-gen=true
type TrafficSplitConfig struct {
Rules []TrafficSplitConfigRule `json:"rules"`
}

// TrafficSplitConfigRule is the rule config in traffic-split plugin config.
type TrafficSplitConfigRule struct {
WeightedUpstreams []TrafficSplitConfigRuleWeightedUpstream `json:"weighted_upstreams"`
}

// TrafficSplitConfigRuleWeightedUpstream is the weighted upstream config in
// the traffic split plugin rule.
type TrafficSplitConfigRuleWeightedUpstream struct {
UpstreamID string `json:"upstream_id,omitempty"`
Weight int `json:"weight"`
}

// NewDefaultUpstream returns an empty Upstream with default values.
func NewDefaultUpstream() *Upstream {
return &Upstream{
Expand Down