Skip to content

Commit

Permalink
feat(meshmultizoneservice): vip allocation (#10667)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Dyszkiewicz <jakub.dyszkiewicz@gmail.com>
  • Loading branch information
jakubdyszkiewicz committed Jul 1, 2024
1 parent 8df650b commit b058166
Show file tree
Hide file tree
Showing 18 changed files with 240 additions and 377 deletions.
3 changes: 3 additions & 0 deletions docs/generated/raw/kuma-cp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -813,5 +813,8 @@ ipam:
meshExternalService:
# CIDR for MeshExternalService IPs
cidr: 242.0.0.0/8 # ENV: KUMA_IPAM_MESH_EXTERNAL_SERVICE_CIDR
meshMultiZoneService:
# CIDR for MeshMultiZoneService IPs
cidr: 243.0.0.0/8 # ENV: KUMA_IPAM_MESH_MULTI_ZONE_SERVICE_CIDR
# Interval on which Kuma will allocate new IPs for MeshServices and MeshExternalServices
allocationInterval: 5s # ENV: KUMA_IPAM_ALLOCATION_INTERVAL
20 changes: 18 additions & 2 deletions pkg/config/app/kuma-cp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,9 @@ var DefaultConfig = func() Config {
MeshExternalService: MeshExternalServiceIPAM{
CIDR: "242.0.0.0/8",
},
MeshMultiZoneService: MeshMultiZoneServiceIPAM{
CIDR: "243.0.0.0/8",
},
AllocationInterval: config_types.Duration{Duration: 5 * time.Second},
},
}
Expand Down Expand Up @@ -469,8 +472,9 @@ type ExperimentalKDSEventBasedWatchdog struct {
}

type IPAMConfig struct {
MeshService MeshServiceIPAM `json:"meshService"`
MeshExternalService MeshExternalServiceIPAM `json:"meshExternalService"`
MeshService MeshServiceIPAM `json:"meshService"`
MeshExternalService MeshExternalServiceIPAM `json:"meshExternalService"`
MeshMultiZoneService MeshMultiZoneServiceIPAM `json:"meshMultiZoneService"`
// Interval on which Kuma will allocate new IPs and generate hostnames.
AllocationInterval config_types.Duration `json:"allocationInterval" envconfig:"KUMA_IPAM_ALLOCATION_INTERVAL"`
}
Expand Down Expand Up @@ -515,3 +519,15 @@ func (c Config) GetEnvoyAdminPort() uint32 {
}
return c.BootstrapServer.Params.AdminPort
}

type MeshMultiZoneServiceIPAM struct {
// CIDR for MeshMultiZone IPs
CIDR string `json:"cidr" envconfig:"KUMA_IPAM_MESH_MULTI_ZONE_SERVICE_CIDR"`
}

func (i MeshMultiZoneServiceIPAM) Validate() error {
if _, _, err := net.ParseCIDR(i.CIDR); err != nil {
return errors.Wrap(err, ".MeshMultiZoneServiceCIDR is invalid")
}
return nil
}
3 changes: 3 additions & 0 deletions pkg/config/app/kuma-cp/kuma-cp.defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -813,5 +813,8 @@ ipam:
meshExternalService:
# CIDR for MeshExternalService IPs
cidr: 242.0.0.0/8 # ENV: KUMA_IPAM_MESH_EXTERNAL_SERVICE_CIDR
meshMultiZoneService:
# CIDR for MeshMultiZoneService IPs
cidr: 243.0.0.0/8 # ENV: KUMA_IPAM_MESH_MULTI_ZONE_SERVICE_CIDR
# Interval on which Kuma will allocate new IPs for MeshServices and MeshExternalServices
allocationInterval: 5s # ENV: KUMA_IPAM_ALLOCATION_INTERVAL
4 changes: 4 additions & 0 deletions pkg/config/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ var _ = Describe("Config loader", func() {

Expect(cfg.IPAM.MeshService.CIDR).To(Equal("251.0.0.0/8"))
Expect(cfg.IPAM.MeshExternalService.CIDR).To(Equal("252.0.0.0/8"))
Expect(cfg.IPAM.MeshMultiZoneService.CIDR).To(Equal("253.0.0.0/8"))
Expect(cfg.IPAM.AllocationInterval.Duration).To(Equal(7 * time.Second))

Expect(cfg.CoreResources.Enabled).To(Equal([]string{"meshservice"}))
Expand Down Expand Up @@ -765,6 +766,8 @@ ipam:
cidr: 251.0.0.0/8
meshExternalService:
cidr: 252.0.0.0/8
meshMultiZoneService:
cidr: 253.0.0.0/8
allocationInterval: 7s
`,
}),
Expand Down Expand Up @@ -1049,6 +1052,7 @@ ipam:
"KUMA_CORE_RESOURCES_STATUS_MESH_MULTI_ZONE_SERVICE_INTERVAL": "7s",
"KUMA_IPAM_MESH_SERVICE_CIDR": "251.0.0.0/8",
"KUMA_IPAM_MESH_EXTERNAL_SERVICE_CIDR": "252.0.0.0/8",
"KUMA_IPAM_MESH_MULTI_ZONE_SERVICE_CIDR": "253.0.0.0/8",
"KUMA_IPAM_ALLOCATION_INTERVAL": "7s",
},
yamlFileConfig: "",
Expand Down
128 changes: 105 additions & 23 deletions pkg/core/resources/apis/core/vip/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,70 @@ package vip

import (
"context"
"net"
"time"

"github.com/Nordix/simple-ipam/pkg/ipam"
"github.com/go-logr/logr"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

"github.com/kumahq/kuma/pkg/core/resources/manager"
"github.com/kumahq/kuma/pkg/core/resources/model"
"github.com/kumahq/kuma/pkg/core/resources/store"
"github.com/kumahq/kuma/pkg/core/runtime/component"
"github.com/kumahq/kuma/pkg/core/user"
core_metrics "github.com/kumahq/kuma/pkg/metrics"
)

type VIPAllocator interface {
ReportDuration(start time.Time)
InitIPAM(ctx context.Context) error
AllocateVIPs(context.Context) error
type ResourceHoldingVIPs interface {
model.Resource
VIPs() []string
AllocateVIP(vip string)
}

// Allocator manages IPs for MeshExternalServices.
// Each time allocator starts it initiates the IPAM based on existing MeshExternalServices
// Allocator manages IPs for resources holding vips like MeshService/MeshExternalServices/MeshMultiZoneService.
// Each time allocator starts it initiates the IPAM based on existing vips
// We don't free addresses explicitly, but we always allocate next free IP to avoid a problem when we
// 1) Remove MeshExternalService A with IP X
// 2) Add new MeshExternalService B that gets IP X
// 1) Remove MeshService A with IP X
// 2) Add new MeshService B that gets IP X
// 3) Clients that were sending the traffic to A now sends the traffic to B for brief amount of time
// IPAM is kept in memory to avoid state management, so technically this problem can still happen when leader changes
// However, leader should not change before TTL of a DNS that serves this VIP.
//
// It's technically possible to allocate all addresses by creating and removing services in the loop.
// However, CIDR has range of 16M addresses, after that the component will just restart.
type Allocator struct {
logger logr.Logger
interval time.Duration
allocators []VIPAllocator
logger logr.Logger
interval time.Duration
cidrToDescriptors map[string]model.ResourceTypeDescriptor
resManager manager.ResourceManager
metric prometheus.Summary
}

var _ component.Component = &Allocator{}

func NewAllocator(
logger logr.Logger,
interval time.Duration,
allocators []VIPAllocator,
cidrToDescriptors map[string]model.ResourceTypeDescriptor,
metrics core_metrics.Metrics,
resManager manager.ResourceManager,
) (*Allocator, error) {
metric := prometheus.NewSummary(prometheus.SummaryOpts{
Name: "component_vip_allocator",
Help: "Summary of VIP allocation duration",
Objectives: core_metrics.DefaultObjectives,
})
if err := metrics.Register(metric); err != nil {
return nil, err
}
return &Allocator{
logger: logger,
interval: interval,
allocators: allocators,
logger: logger,
interval: interval,
cidrToDescriptors: cidrToDescriptors,
metric: metric,
resManager: resManager,
}, nil
}

Expand All @@ -52,22 +74,37 @@ func (a *Allocator) Start(stop <-chan struct{}) error {
ticker := time.NewTicker(a.interval)
ctx := user.Ctx(context.Background(), user.ControlPlane)

for _, allocator := range a.allocators {
if err := allocator.InitIPAM(ctx); err != nil {
return err
ipams := map[string]*ipam.IPAM{}

for cidr, typeDescriptor := range a.cidrToDescriptors {
newIPAM, err := ipam.New(cidr)
if err != nil {
return errors.Wrapf(err, "could not allocate IPAM of CIDR %s", cidr)
}

resources, err := a.listResourceHoldingVIPs(ctx, typeDescriptor)
if err != nil {
return errors.Wrapf(err, "could not list resources for IPAM initialization for %s", typeDescriptor.Name)
}

for _, res := range resources {
for _, vip := range res.VIPs() {
_ = newIPAM.Reserve(net.ParseIP(vip)) // ignore error for outside of range
}
}
ipams[cidr] = newIPAM
}

for {
select {
case <-ticker.C:
for _, allocator := range a.allocators {
start := time.Now()
if err := allocator.AllocateVIPs(ctx); err != nil {
return err
start := time.Now()
for cidr, typeDesc := range a.cidrToDescriptors {
if err := a.allocateVIPs(ctx, typeDesc, ipams[cidr]); err != nil {
a.logger.Error(err, "could not allocate vips", "type", typeDesc.Name)
}
allocator.ReportDuration(start)
}
a.metric.Observe(float64(time.Since(start).Milliseconds()))

case <-stop:
a.logger.Info("stopping")
Expand All @@ -76,6 +113,51 @@ func (a *Allocator) Start(stop <-chan struct{}) error {
}
}

func (a *Allocator) listResourceHoldingVIPs(ctx context.Context, typeDesc model.ResourceTypeDescriptor) ([]ResourceHoldingVIPs, error) {
list := typeDesc.NewList()
if err := a.resManager.List(ctx, list); err != nil {
return nil, err
}
result := make([]ResourceHoldingVIPs, len(list.GetItems()))
for i, res := range list.GetItems() {
result[i] = res.(ResourceHoldingVIPs)
}
return result, nil
}

func (a *Allocator) allocateVIPs(ctx context.Context, typeDesc model.ResourceTypeDescriptor, kumaIpam *ipam.IPAM) error {
resources, err := a.listResourceHoldingVIPs(ctx, typeDesc)
if err != nil {
return err
}

for _, resource := range resources {
if len(resource.VIPs()) == 0 {
log := a.logger.WithValues(
"name", resource.GetMeta().GetName(),
"mesh", resource.GetMeta().GetMesh(),
"type", resource.Descriptor().Name,
)
ip, err := kumaIpam.Allocate()
if err != nil {
return errors.Wrapf(err, "could not allocate vip for %s %s", typeDesc.Name, resource.GetMeta().GetName())
}
log.Info("allocating IP", "ip", ip.String())
resource.AllocateVIP(ip.String())

if err := a.resManager.Update(ctx, resource); err != nil {
msg := "could not update the resource with allocated Kuma VIP. Will try to update in the next allocation window"
if errors.Is(err, &store.ResourceConflictError{}) {
log.Info(msg, "cause", "conflict", "interval", a.interval)
} else {
log.Error(err, msg, "interval", a.interval)
}
}
}
}
return nil
}

func (a *Allocator) NeedLeaderElection() bool {
return true
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package vip
package vip_test

import (
"context"
Expand All @@ -9,6 +9,7 @@ import (
. "github.com/onsi/gomega"

"github.com/kumahq/kuma/pkg/core/resources/apis/core/vip"
meshextenralservice_api "github.com/kumahq/kuma/pkg/core/resources/apis/meshexternalservice/api/v1alpha1"
meshservice_api "github.com/kumahq/kuma/pkg/core/resources/apis/meshservice/api/v1alpha1"
"github.com/kumahq/kuma/pkg/core/resources/manager"
"github.com/kumahq/kuma/pkg/core/resources/model"
Expand All @@ -29,9 +30,18 @@ var _ = Describe("VIP Allocator", func() {
Expect(err).ToNot(HaveOccurred())
metrics = m
resManager = manager.NewResourceManager(memory.NewStore())
meshServiceAllocator, err := NewMeshServiceAllocator(logr.Discard(), "241.0.0.0/8", resManager, 50*time.Millisecond, m)
Expect(err).ToNot(HaveOccurred())
allocator, err := vip.NewAllocator(logr.Discard(), 50*time.Millisecond, []vip.VIPAllocator{meshServiceAllocator})

allocator, err := vip.NewAllocator(
logr.Discard(),
50*time.Millisecond,
map[string]model.ResourceTypeDescriptor{
"241.0.0.0/8": meshservice_api.MeshServiceResourceTypeDescriptor,
"242.0.0.0/8": meshextenralservice_api.MeshExternalServiceResourceTypeDescriptor,
},
metrics,
resManager,
)

Expect(err).ToNot(HaveOccurred())
stopCh = make(chan struct{})
go func() {
Expand All @@ -56,7 +66,7 @@ var _ = Describe("VIP Allocator", func() {
return ms.Status.VIPs[0].IP
}

It("should allocate vip for service without vip", func() {
It("should allocate vip for MeshService without vip", func() {
// when
err := samples.MeshServiceBackendBuilder().WithoutVIP().Create(resManager)
Expect(err).ToNot(HaveOccurred())
Expand All @@ -67,6 +77,20 @@ var _ = Describe("VIP Allocator", func() {
}, "10s", "100ms").Should(Succeed())
})

It("should allocate vip for MeshExternalService without vip in different CIDR", func() {
// when
err := samples.MeshExternalServiceExampleBuilder().WithoutVIP().Create(resManager)
Expect(err).ToNot(HaveOccurred())

// then
Eventually(func(g Gomega) {
mes := meshextenralservice_api.NewMeshExternalServiceResource()
err := resManager.Get(context.Background(), mes, store.GetByKey("example", model.DefaultMesh))
g.Expect(err).ToNot(HaveOccurred())
g.Expect(mes.Status.VIP.IP).To(Equal("242.0.0.0"))
}, "10s", "100ms").Should(Succeed())
})

It("should not reuse IPs", func() {
// given
err := samples.MeshServiceBackendBuilder().WithoutVIP().Create(resManager)
Expand All @@ -89,7 +113,7 @@ var _ = Describe("VIP Allocator", func() {

It("should emit metric", func() {
Eventually(func(g Gomega) {
g.Expect(test_metrics.FindMetric(metrics, "component_ms_vip_allocator")).ToNot(BeNil())
g.Expect(test_metrics.FindMetric(metrics, "component_vip_allocator")).ToNot(BeNil())
}, "10s", "100ms").Should(Succeed())
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ import (
)

func TestVIP(t *testing.T) {
test.RunSpecs(t, "MeshService VIP Suite")
test.RunSpecs(t, "VIP Suite")
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package v1alpha1

import mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
import (
mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
"github.com/kumahq/kuma/pkg/core/resources/apis/core/vip"
)

func (m *MeshExternalServiceResource) DestinationName(port uint32) string {
return m.GetMeta().GetName()
Expand All @@ -9,3 +12,18 @@ func (m *MeshExternalServiceResource) DestinationName(port uint32) string {
func (m *MeshExternalServiceResource) IsReachableFromZone(zone string) bool {
return m.GetMeta().GetLabels() == nil || m.GetMeta().GetLabels()[mesh_proto.ZoneTag] == "" || m.GetMeta().GetLabels()[mesh_proto.ZoneTag] == zone
}

var _ vip.ResourceHoldingVIPs = &MeshExternalServiceResource{}

func (t *MeshExternalServiceResource) VIPs() []string {
if t.Status.VIP.IP == "" {
return nil
}
return []string{t.Status.VIP.IP}
}

func (t *MeshExternalServiceResource) AllocateVIP(vip string) {
t.Status.VIP = VIP{
IP: vip,
}
}
Loading

0 comments on commit b058166

Please sign in to comment.