Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(meshmultizoneservice): vip allocation #10667

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/generated/raw/kuma-cp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -813,5 +813,8 @@ ipam:
meshExternalService:
# CIDR for MeshExternalService IPs
cidr: 242.0.0.0/8 # ENV: KUMA_IPAM_MESH_EXTERNAL_SERVICE_CIDR
meshMultiZoneService:
# CIDR for MeshMultiZoneService IPs
cidr: 243.0.0.0/8 # ENV: KUMA_IPAM_MESH_MULTI_ZONE_SERVICE_CIDR
# Interval on which Kuma will allocate new IPs for MeshServices and MeshExternalServices
allocationInterval: 5s # ENV: KUMA_IPAM_ALLOCATION_INTERVAL
20 changes: 18 additions & 2 deletions pkg/config/app/kuma-cp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,9 @@ var DefaultConfig = func() Config {
MeshExternalService: MeshExternalServiceIPAM{
CIDR: "242.0.0.0/8",
},
MeshMultiZoneService: MeshMultiZoneServiceIPAM{
CIDR: "243.0.0.0/8",
},
AllocationInterval: config_types.Duration{Duration: 5 * time.Second},
},
}
Expand Down Expand Up @@ -469,8 +472,9 @@ type ExperimentalKDSEventBasedWatchdog struct {
}

type IPAMConfig struct {
MeshService MeshServiceIPAM `json:"meshService"`
MeshExternalService MeshExternalServiceIPAM `json:"meshExternalService"`
MeshService MeshServiceIPAM `json:"meshService"`
MeshExternalService MeshExternalServiceIPAM `json:"meshExternalService"`
MeshMultiZoneService MeshMultiZoneServiceIPAM `json:"meshMultiZoneService"`
// Interval on which Kuma will allocate new IPs and generate hostnames.
AllocationInterval config_types.Duration `json:"allocationInterval" envconfig:"KUMA_IPAM_ALLOCATION_INTERVAL"`
}
Expand Down Expand Up @@ -515,3 +519,15 @@ func (c Config) GetEnvoyAdminPort() uint32 {
}
return c.BootstrapServer.Params.AdminPort
}

type MeshMultiZoneServiceIPAM struct {
// CIDR for MeshMultiZone IPs
CIDR string `json:"cidr" envconfig:"KUMA_IPAM_MESH_MULTI_ZONE_SERVICE_CIDR"`
}

func (i MeshMultiZoneServiceIPAM) Validate() error {
if _, _, err := net.ParseCIDR(i.CIDR); err != nil {
return errors.Wrap(err, ".MeshMultiZoneServiceCIDR is invalid")
}
return nil
}
3 changes: 3 additions & 0 deletions pkg/config/app/kuma-cp/kuma-cp.defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -813,5 +813,8 @@ ipam:
meshExternalService:
# CIDR for MeshExternalService IPs
cidr: 242.0.0.0/8 # ENV: KUMA_IPAM_MESH_EXTERNAL_SERVICE_CIDR
meshMultiZoneService:
# CIDR for MeshMultiZoneService IPs
cidr: 243.0.0.0/8 # ENV: KUMA_IPAM_MESH_MULTI_ZONE_SERVICE_CIDR
# Interval on which Kuma will allocate new IPs for MeshServices and MeshExternalServices
allocationInterval: 5s # ENV: KUMA_IPAM_ALLOCATION_INTERVAL
4 changes: 4 additions & 0 deletions pkg/config/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ var _ = Describe("Config loader", func() {

Expect(cfg.IPAM.MeshService.CIDR).To(Equal("251.0.0.0/8"))
Expect(cfg.IPAM.MeshExternalService.CIDR).To(Equal("252.0.0.0/8"))
Expect(cfg.IPAM.MeshMultiZoneService.CIDR).To(Equal("253.0.0.0/8"))
Expect(cfg.IPAM.AllocationInterval.Duration).To(Equal(7 * time.Second))

Expect(cfg.CoreResources.Enabled).To(Equal([]string{"meshservice"}))
Expand Down Expand Up @@ -765,6 +766,8 @@ ipam:
cidr: 251.0.0.0/8
meshExternalService:
cidr: 252.0.0.0/8
meshMultiZoneService:
cidr: 253.0.0.0/8
allocationInterval: 7s
`,
}),
Expand Down Expand Up @@ -1049,6 +1052,7 @@ ipam:
"KUMA_CORE_RESOURCES_STATUS_MESH_MULTI_ZONE_SERVICE_INTERVAL": "7s",
"KUMA_IPAM_MESH_SERVICE_CIDR": "251.0.0.0/8",
"KUMA_IPAM_MESH_EXTERNAL_SERVICE_CIDR": "252.0.0.0/8",
"KUMA_IPAM_MESH_MULTI_ZONE_SERVICE_CIDR": "253.0.0.0/8",
"KUMA_IPAM_ALLOCATION_INTERVAL": "7s",
},
yamlFileConfig: "",
Expand Down
128 changes: 105 additions & 23 deletions pkg/core/resources/apis/core/vip/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,70 @@ package vip

import (
"context"
"net"
"time"

"github.com/Nordix/simple-ipam/pkg/ipam"
"github.com/go-logr/logr"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

"github.com/kumahq/kuma/pkg/core/resources/manager"
"github.com/kumahq/kuma/pkg/core/resources/model"
"github.com/kumahq/kuma/pkg/core/resources/store"
"github.com/kumahq/kuma/pkg/core/runtime/component"
"github.com/kumahq/kuma/pkg/core/user"
core_metrics "github.com/kumahq/kuma/pkg/metrics"
)

type VIPAllocator interface {
ReportDuration(start time.Time)
InitIPAM(ctx context.Context) error
AllocateVIPs(context.Context) error
type ResourceHoldingVIPs interface {
model.Resource
VIPs() []string
AllocateVIP(vip string)
}

// Allocator manages IPs for MeshExternalServices.
// Each time allocator starts it initiates the IPAM based on existing MeshExternalServices
// Allocator manages IPs for resources holding vips like MeshService/MeshExternalServices/MeshMultiZoneService.
// Each time allocator starts it initiates the IPAM based on existing vips
// We don't free addresses explicitly, but we always allocate next free IP to avoid a problem when we
// 1) Remove MeshExternalService A with IP X
// 2) Add new MeshExternalService B that gets IP X
// 1) Remove MeshService A with IP X
// 2) Add new MeshService B that gets IP X
// 3) Clients that were sending the traffic to A now sends the traffic to B for brief amount of time
// IPAM is kept in memory to avoid state management, so technically this problem can still happen when leader changes
// However, leader should not change before TTL of a DNS that serves this VIP.
//
// It's technically possible to allocate all addresses by creating and removing services in the loop.
// However, CIDR has range of 16M addresses, after that the component will just restart.
type Allocator struct {
logger logr.Logger
interval time.Duration
allocators []VIPAllocator
logger logr.Logger
interval time.Duration
cidrToDescriptors map[string]model.ResourceTypeDescriptor
resManager manager.ResourceManager
metric prometheus.Summary
}

var _ component.Component = &Allocator{}

func NewAllocator(
logger logr.Logger,
interval time.Duration,
allocators []VIPAllocator,
cidrToDescriptors map[string]model.ResourceTypeDescriptor,
metrics core_metrics.Metrics,
resManager manager.ResourceManager,
) (*Allocator, error) {
metric := prometheus.NewSummary(prometheus.SummaryOpts{
Name: "component_vip_allocator",
Help: "Summary of VIP allocation duration",
Objectives: core_metrics.DefaultObjectives,
})
if err := metrics.Register(metric); err != nil {
return nil, err
}
return &Allocator{
logger: logger,
interval: interval,
allocators: allocators,
logger: logger,
interval: interval,
cidrToDescriptors: cidrToDescriptors,
metric: metric,
resManager: resManager,
}, nil
}

Expand All @@ -52,22 +74,37 @@ func (a *Allocator) Start(stop <-chan struct{}) error {
ticker := time.NewTicker(a.interval)
ctx := user.Ctx(context.Background(), user.ControlPlane)

for _, allocator := range a.allocators {
if err := allocator.InitIPAM(ctx); err != nil {
return err
ipams := map[string]*ipam.IPAM{}

for cidr, typeDescriptor := range a.cidrToDescriptors {
newIPAM, err := ipam.New(cidr)
if err != nil {
return errors.Wrapf(err, "could not allocate IPAM of CIDR %s", cidr)
}

resources, err := a.listResourceHoldingVIPs(ctx, typeDescriptor)
if err != nil {
return errors.Wrapf(err, "could not list resources for IPAM initialization for %s", typeDescriptor.Name)
}

for _, res := range resources {
for _, vip := range res.VIPs() {
_ = newIPAM.Reserve(net.ParseIP(vip)) // ignore error for outside of range
}
}
ipams[cidr] = newIPAM
}

for {
select {
case <-ticker.C:
for _, allocator := range a.allocators {
start := time.Now()
if err := allocator.AllocateVIPs(ctx); err != nil {
return err
start := time.Now()
for cidr, typeDesc := range a.cidrToDescriptors {
if err := a.allocateVIPs(ctx, typeDesc, ipams[cidr]); err != nil {
a.logger.Error(err, "could not allocate vips", "type", typeDesc.Name)
}
allocator.ReportDuration(start)
}
a.metric.Observe(float64(time.Since(start).Milliseconds()))

case <-stop:
a.logger.Info("stopping")
Expand All @@ -76,6 +113,51 @@ func (a *Allocator) Start(stop <-chan struct{}) error {
}
}

func (a *Allocator) listResourceHoldingVIPs(ctx context.Context, typeDesc model.ResourceTypeDescriptor) ([]ResourceHoldingVIPs, error) {
list := typeDesc.NewList()
if err := a.resManager.List(ctx, list); err != nil {
return nil, err
}
result := make([]ResourceHoldingVIPs, len(list.GetItems()))
for i, res := range list.GetItems() {
result[i] = res.(ResourceHoldingVIPs)
}
return result, nil
}

func (a *Allocator) allocateVIPs(ctx context.Context, typeDesc model.ResourceTypeDescriptor, kumaIpam *ipam.IPAM) error {
resources, err := a.listResourceHoldingVIPs(ctx, typeDesc)
if err != nil {
return err
}

for _, resource := range resources {
if len(resource.VIPs()) == 0 {
log := a.logger.WithValues(
"name", resource.GetMeta().GetName(),
"mesh", resource.GetMeta().GetMesh(),
"type", resource.Descriptor().Name,
)
ip, err := kumaIpam.Allocate()
if err != nil {
return errors.Wrapf(err, "could not allocate vip for %s %s", typeDesc.Name, resource.GetMeta().GetName())
}
log.Info("allocating IP", "ip", ip.String())
resource.AllocateVIP(ip.String())

if err := a.resManager.Update(ctx, resource); err != nil {
msg := "could not update the resource with allocated Kuma VIP. Will try to update in the next allocation window"
if errors.Is(err, &store.ResourceConflictError{}) {
log.Info(msg, "cause", "conflict", "interval", a.interval)
} else {
log.Error(err, msg, "interval", a.interval)
}
}
}
}
return nil
}

func (a *Allocator) NeedLeaderElection() bool {
return true
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package vip
package vip_test

import (
"context"
Expand All @@ -9,6 +9,7 @@ import (
. "github.com/onsi/gomega"

"github.com/kumahq/kuma/pkg/core/resources/apis/core/vip"
meshextenralservice_api "github.com/kumahq/kuma/pkg/core/resources/apis/meshexternalservice/api/v1alpha1"
meshservice_api "github.com/kumahq/kuma/pkg/core/resources/apis/meshservice/api/v1alpha1"
"github.com/kumahq/kuma/pkg/core/resources/manager"
"github.com/kumahq/kuma/pkg/core/resources/model"
Expand All @@ -29,9 +30,18 @@ var _ = Describe("VIP Allocator", func() {
Expect(err).ToNot(HaveOccurred())
metrics = m
resManager = manager.NewResourceManager(memory.NewStore())
meshServiceAllocator, err := NewMeshServiceAllocator(logr.Discard(), "241.0.0.0/8", resManager, 50*time.Millisecond, m)
Expect(err).ToNot(HaveOccurred())
allocator, err := vip.NewAllocator(logr.Discard(), 50*time.Millisecond, []vip.VIPAllocator{meshServiceAllocator})

allocator, err := vip.NewAllocator(
logr.Discard(),
50*time.Millisecond,
map[string]model.ResourceTypeDescriptor{
"241.0.0.0/8": meshservice_api.MeshServiceResourceTypeDescriptor,
"242.0.0.0/8": meshextenralservice_api.MeshExternalServiceResourceTypeDescriptor,
},
metrics,
jakubdyszkiewicz marked this conversation as resolved.
Show resolved Hide resolved
resManager,
)

Expect(err).ToNot(HaveOccurred())
stopCh = make(chan struct{})
go func() {
Expand All @@ -56,7 +66,7 @@ var _ = Describe("VIP Allocator", func() {
return ms.Status.VIPs[0].IP
}

It("should allocate vip for service without vip", func() {
It("should allocate vip for MeshService without vip", func() {
// when
err := samples.MeshServiceBackendBuilder().WithoutVIP().Create(resManager)
Expect(err).ToNot(HaveOccurred())
Expand All @@ -67,6 +77,20 @@ var _ = Describe("VIP Allocator", func() {
}, "10s", "100ms").Should(Succeed())
})

It("should allocate vip for MeshExternalService without vip in different CIDR", func() {
// when
err := samples.MeshExternalServiceExampleBuilder().WithoutVIP().Create(resManager)
Expect(err).ToNot(HaveOccurred())

// then
Eventually(func(g Gomega) {
mes := meshextenralservice_api.NewMeshExternalServiceResource()
err := resManager.Get(context.Background(), mes, store.GetByKey("example", model.DefaultMesh))
g.Expect(err).ToNot(HaveOccurred())
g.Expect(mes.Status.VIP.IP).To(Equal("242.0.0.0"))
}, "10s", "100ms").Should(Succeed())
})

It("should not reuse IPs", func() {
// given
err := samples.MeshServiceBackendBuilder().WithoutVIP().Create(resManager)
Expand All @@ -89,7 +113,7 @@ var _ = Describe("VIP Allocator", func() {

It("should emit metric", func() {
Eventually(func(g Gomega) {
g.Expect(test_metrics.FindMetric(metrics, "component_ms_vip_allocator")).ToNot(BeNil())
g.Expect(test_metrics.FindMetric(metrics, "component_vip_allocator")).ToNot(BeNil())
}, "10s", "100ms").Should(Succeed())
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ import (
)

func TestVIP(t *testing.T) {
test.RunSpecs(t, "MeshService VIP Suite")
test.RunSpecs(t, "VIP Suite")
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package v1alpha1

import mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
import (
mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
"github.com/kumahq/kuma/pkg/core/resources/apis/core/vip"
)

func (m *MeshExternalServiceResource) DestinationName(port uint32) string {
return m.GetMeta().GetName()
Expand All @@ -9,3 +12,18 @@ func (m *MeshExternalServiceResource) DestinationName(port uint32) string {
func (m *MeshExternalServiceResource) IsReachableFromZone(zone string) bool {
return m.GetMeta().GetLabels() == nil || m.GetMeta().GetLabels()[mesh_proto.ZoneTag] == "" || m.GetMeta().GetLabels()[mesh_proto.ZoneTag] == zone
}

var _ vip.ResourceHoldingVIPs = &MeshExternalServiceResource{}

func (t *MeshExternalServiceResource) VIPs() []string {
if t.Status.VIP.IP == "" {
return nil
}
return []string{t.Status.VIP.IP}
}

func (t *MeshExternalServiceResource) AllocateVIP(vip string) {
t.Status.VIP = VIP{
IP: vip,
}
}
Loading
Loading