Skip to content

Commit

Permalink
feat(meshservice): ipam (#10320)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Dyszkiewicz <jakub.dyszkiewicz@gmail.com>
  • Loading branch information
jakubdyszkiewicz committed May 28, 2024
1 parent b94f248 commit 3220769
Show file tree
Hide file tree
Showing 11 changed files with 462 additions and 0 deletions.
5 changes: 5 additions & 0 deletions app/kuma-cp/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/kumahq/kuma/pkg/hds"
"github.com/kumahq/kuma/pkg/insights"
"github.com/kumahq/kuma/pkg/intercp"
"github.com/kumahq/kuma/pkg/ipam"
kds_global "github.com/kumahq/kuma/pkg/kds/global"
kds_zone "github.com/kumahq/kuma/pkg/kds/zone"
mads_server "github.com/kumahq/kuma/pkg/mads/server"
Expand Down Expand Up @@ -145,6 +146,10 @@ func newRunCmdWithOpts(opts kuma_cmd.RunCmdOpts) *cobra.Command {
runLog.Error(err, "unable to set up Control Plane Intercommunication")
return err
}
if err := ipam.Setup(rt); err != nil {
runLog.Error(err, "unable to set up IPAM")
return err
}

runLog.Info("starting Control Plane", "version", kuma_version.Build.Version)
if err := rt.Start(gracefulCtx.Done()); err != nil {
Expand Down
8 changes: 8 additions & 0 deletions docs/generated/raw/kuma-cp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -793,3 +793,11 @@ coreResources:
enabled: # ENV: KUMA_CORE_RESOURCES_ENABLED
- meshexternalservices
- meshservices
# IP address management configuration
ipam:
# MeshService address management
meshService:
# CIDR for MeshService IPs
cidr: 241.0.0.0/8 # ENV: IPAM_MESH_SERVICE_CIDR
# Interval on which Kuma will allocate new IPs for MeshServices
allocationInterval: 5s # ENV: IPAM_MESH_SERVICE_ALLOCATION_INTERVAL
37 changes: 37 additions & 0 deletions pkg/config/app/kuma-cp/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kuma_cp

import (
"net"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -176,6 +177,8 @@ type Config struct {
Policies *policies.Config `json:"policies"`
// CoreResources holds configuration for generated core resources like MeshService
CoreResources *apis.Config `json:"coreResources"`
// IP administration and management config
IPAM IPAMConfig `json:"ipam"`
}

func (c Config) IsFederatedZoneCP() bool {
Expand Down Expand Up @@ -277,6 +280,12 @@ var DefaultConfig = func() Config {
EventBus: eventbus.Default(),
Policies: policies.Default(),
CoreResources: apis.Default(),
IPAM: IPAMConfig{
MeshService: MeshServiceIPAM{
CIDR: "241.0.0.0/8",
AllocationInterval: config_types.Duration{Duration: 5 * time.Second},
},
},
}
}

Expand Down Expand Up @@ -339,6 +348,9 @@ func (c *Config) Validate() error {
if err := c.Policies.Validate(); err != nil {
return errors.Wrap(err, "Policies validation failed")
}
if err := c.IPAM.Validate(); err != nil {
return errors.Wrap(err, "IPAM validation failed")
}
return nil
}

Expand Down Expand Up @@ -453,6 +465,31 @@ type ExperimentalKDSEventBasedWatchdog struct {
DelayFullResync bool `json:"delayFullResync" envconfig:"KUMA_EXPERIMENTAL_KDS_EVENT_BASED_WATCHDOG_DELAY_FULL_RESYNC"`
}

type IPAMConfig struct {
MeshService MeshServiceIPAM `json:"meshService"`
}

func (i IPAMConfig) Validate() error {
if err := i.MeshService.Validate(); err != nil {
return errors.Wrap(err, "MeshServie validation failed")
}
return nil
}

type MeshServiceIPAM struct {
// CIDR for MeshService IPs
CIDR string `json:"cidr" envconfig:"KUMA_IPAM_MESH_SERVICE_CIDR"`
// Interval on which Kuma will allocate new IPs for MeshServices
AllocationInterval config_types.Duration `json:"allocationInterval" envconfig:"KUMA_IPAM_MESH_SERVICE_ALLOCATION_INTERVAL"`
}

func (i MeshServiceIPAM) Validate() error {
if _, _, err := net.ParseCIDR(i.CIDR); err != nil {
return errors.Wrap(err, ".MeshServiceCIDR is invalid")
}
return nil
}

func (c Config) GetEnvoyAdminPort() uint32 {
if c.BootstrapServer == nil || c.BootstrapServer.Params == nil {
return 0
Expand Down
8 changes: 8 additions & 0 deletions pkg/config/app/kuma-cp/kuma-cp.defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -793,3 +793,11 @@ coreResources:
enabled: # ENV: KUMA_CORE_RESOURCES_ENABLED
- meshexternalservices
- meshservices
# IP address management configuration
ipam:
# MeshService address management
meshService:
# CIDR for MeshService IPs
cidr: 241.0.0.0/8 # ENV: IPAM_MESH_SERVICE_CIDR
# Interval on which Kuma will allocate new IPs for MeshServices
allocationInterval: 5s # ENV: IPAM_MESH_SERVICE_ALLOCATION_INTERVAL
9 changes: 9 additions & 0 deletions pkg/config/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,9 @@ var _ = Describe("Config loader", func() {

Expect(cfg.Proxy.Gateway.GlobalDownstreamMaxConnections).To(BeNumerically("==", 1))
Expect(cfg.EventBus.BufferSize).To(Equal(uint(30)))

Expect(cfg.IPAM.MeshService.CIDR).To(Equal("251.0.0.0/8"))
Expect(cfg.IPAM.MeshService.AllocationInterval.Duration).To(Equal(7 * time.Second))
},
Entry("from config file", testCase{
envVars: map[string]string{},
Expand Down Expand Up @@ -748,6 +751,10 @@ tracing:
openTelemetry:
enabled: true
endpoint: collector:4317
ipam:
meshService:
cidr: 251.0.0.0/8
allocationInterval: 7s
`,
}),
Entry("from env variables", testCase{
Expand Down Expand Up @@ -1026,6 +1033,8 @@ tracing:
"KUMA_EVENT_BUS_BUFFER_SIZE": "30",
"KUMA_PLUGIN_POLICIES_ENABLED": "meshaccesslog,meshcircuitbreaker",
"KUMA_CORE_RESOURCES_ENABLED": "meshservice",
"KUMA_IPAM_MESH_SERVICE_CIDR": "251.0.0.0/8",
"KUMA_IPAM_MESH_SERVICE_ALLOCATION_INTERVAL": "7s",
},
yamlFileConfig: "",
}),
Expand Down
146 changes: 146 additions & 0 deletions pkg/core/resources/apis/meshservice/vip/allocator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package vip

import (
"context"
"net"
"time"

"github.com/Nordix/simple-ipam/pkg/ipam"
"github.com/go-logr/logr"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

meshservice_api "github.com/kumahq/kuma/pkg/core/resources/apis/meshservice/api/v1alpha1"
"github.com/kumahq/kuma/pkg/core/resources/manager"
"github.com/kumahq/kuma/pkg/core/resources/store"
"github.com/kumahq/kuma/pkg/core/runtime/component"
"github.com/kumahq/kuma/pkg/core/user"
core_metrics "github.com/kumahq/kuma/pkg/metrics"
)

// Allocator manages IPs for MeshServices.
// Each time allocator starts it initiates the IPAM based on existing MeshServices
// We don't free addresses explicitly, but we always allocate next free IP to avoid a problem when we
// 1) Remove Service A with IP X
// 2) Add new Service B that gets IP X
// 3) Clients that were sending the traffic to A now sends the traffic to B for brief amount of time
// IPAM is kept in memory to avoid state management, so technically this problem can still happen when leader changes
// However, leader should not change before TTL of a DNS that serves this VIP.
//
// It's technically possible to allocate all addresses by creating and removing services in the loop.
// However, CIDR has range of 16M addresses, after that the component will just restart.
type Allocator struct {
logger logr.Logger
cidr string
interval time.Duration
metric prometheus.Summary
resManager manager.ResourceManager
}

var _ component.Component = &Allocator{}

func NewAllocator(
logger logr.Logger,
metrics core_metrics.Metrics,
resManager manager.ResourceManager,
cidr string,
interval time.Duration,
) (*Allocator, error) {
metric := prometheus.NewSummary(prometheus.SummaryOpts{
Name: "component_ms_vip_allocator",
Help: "Summary of Inter CP Heartbeat component interval",
Objectives: core_metrics.DefaultObjectives,
})
if err := metrics.Register(metric); err != nil {
return nil, err
}

return &Allocator{
logger: logger,
resManager: resManager,
cidr: cidr,
interval: interval,
metric: metric,
}, nil
}

func (a *Allocator) Start(stop <-chan struct{}) error {
a.logger.Info("starting")
ticker := time.NewTicker(a.interval)
ctx := user.Ctx(context.Background(), user.ControlPlane)

kumaIpam, err := a.initIpam(ctx)
if err != nil {
return err
}

for {
select {
case <-ticker.C:
start := time.Now()
if err := a.allocateVips(ctx, kumaIpam); err != nil {
return err
}
a.metric.Observe(float64(time.Since(start).Milliseconds()))
case <-stop:
a.logger.Info("stopping")
return nil
}
}
}

func (a *Allocator) NeedLeaderElection() bool {
return true
}

func (a *Allocator) initIpam(ctx context.Context) (*ipam.IPAM, error) {
newIPAM, err := ipam.New(a.cidr)
if err != nil {
return nil, errors.Wrapf(err, "could not allocate IPAM of CIDR %s", a.cidr)
}

services := &meshservice_api.MeshServiceResourceList{}
if err := a.resManager.List(ctx, services); err != nil {
return nil, errors.Wrap(err, "could not list mesh services for initialization of ipam")
}
for _, service := range services.Items {
for _, vip := range service.Status.VIPs {
_ = newIPAM.Reserve(net.ParseIP(vip.IP)) // ignore error for outside of range
}
}

return newIPAM, nil
}

func (a *Allocator) allocateVips(ctx context.Context, kumaIpam *ipam.IPAM) error {
services := &meshservice_api.MeshServiceResourceList{}
if err := a.resManager.List(ctx, services); err != nil {
return errors.Wrap(err, "could not list mesh services for ip allocation")
}

for _, svc := range services.Items {
if len(svc.Status.VIPs) == 0 {
log := a.logger.WithValues("service", svc.Meta.GetName(), "mesh", svc.Meta.GetMesh())
ip, err := kumaIpam.Allocate()
if err != nil {
return errors.Wrapf(err, "could not allocate the address for svc %s", svc.Meta.GetName())
}
log.Info("allocating IP for a service", "ip", ip.String())
svc.Status.VIPs = []meshservice_api.VIP{
{
IP: ip.String(),
},
}

if err := a.resManager.Update(ctx, svc, store.UpdateWithLabels(svc.GetMeta().GetLabels())); err != nil {
msg := "could not update service with allocated Kuma VIP. Will try to update in the next allocation window"
if errors.Is(err, &store.ResourceConflictError{}) {
log.Info(msg, "cause", "conflict", "interval", a.interval)
} else {
log.Error(err, msg, "interval", a.interval)
}
}
}
}
return nil
}
92 changes: 92 additions & 0 deletions pkg/core/resources/apis/meshservice/vip/allocator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package vip

import (
"context"
"time"

"github.com/go-logr/logr"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

meshservice_api "github.com/kumahq/kuma/pkg/core/resources/apis/meshservice/api/v1alpha1"
"github.com/kumahq/kuma/pkg/core/resources/manager"
"github.com/kumahq/kuma/pkg/core/resources/model"
"github.com/kumahq/kuma/pkg/core/resources/store"
core_metrics "github.com/kumahq/kuma/pkg/metrics"
"github.com/kumahq/kuma/pkg/plugins/resources/memory"
test_metrics "github.com/kumahq/kuma/pkg/test/metrics"
"github.com/kumahq/kuma/pkg/test/resources/samples"
)

var _ = Describe("VIP Allocator", func() {
var stopCh chan struct{}
var resManager manager.ResourceManager
var metrics core_metrics.Metrics

BeforeEach(func() {
m, err := core_metrics.NewMetrics("")
Expect(err).ToNot(HaveOccurred())
metrics = m
resManager = manager.NewResourceManager(memory.NewStore())
allocator, err := NewAllocator(logr.Discard(), m, resManager, "241.0.0.0/8", 50*time.Millisecond)
Expect(err).ToNot(HaveOccurred())
stopCh = make(chan struct{})
go func() {
defer GinkgoRecover()
Expect(allocator.Start(stopCh)).To(Succeed())
}()

Expect(samples.MeshDefaultBuilder().Create(resManager)).To(Succeed())
})

AfterEach(func() {
close(stopCh)
})

vipOfMeshService := func(name string) string {
ms := meshservice_api.NewMeshServiceResource()
err := resManager.Get(context.Background(), ms, store.GetByKey(name, model.DefaultMesh))
Expect(err).ToNot(HaveOccurred())
if len(ms.Status.VIPs) == 0 {
return ""
}
return ms.Status.VIPs[0].IP
}

It("should allocate vip for service without vip", func() {
// when
err := samples.MeshServiceBackendBuilder().WithoutVIP().Create(resManager)
Expect(err).ToNot(HaveOccurred())

// then
Eventually(func(g Gomega) {
g.Expect(vipOfMeshService("backend")).Should(Equal("241.0.0.0"))
}, "10s", "100ms").Should(Succeed())
})

It("should not reuse IPs", func() {
// given
err := samples.MeshServiceBackendBuilder().WithoutVIP().Create(resManager)
Expect(err).ToNot(HaveOccurred())
Eventually(func(g Gomega) {
g.Expect(vipOfMeshService("backend")).Should(Equal("241.0.0.0"))
}, "10s", "100ms").Should(Succeed())

// when resource is reapplied
err = resManager.Delete(context.Background(), meshservice_api.NewMeshServiceResource(), store.DeleteByKey("backend", model.DefaultMesh))
Expect(err).ToNot(HaveOccurred())
err = samples.MeshServiceBackendBuilder().WithoutVIP().Create(resManager)
Expect(err).ToNot(HaveOccurred())

// then
Eventually(func(g Gomega) {
g.Expect(vipOfMeshService("backend")).Should(Equal("241.0.0.1"))
}, "10s", "100ms").Should(Succeed())
})

It("should emit metric", func() {
Eventually(func(g Gomega) {
g.Expect(test_metrics.FindMetric(metrics, "component_ms_vip_allocator")).ToNot(BeNil())
}, "10s", "100ms").Should(Succeed())
})
})
11 changes: 11 additions & 0 deletions pkg/core/resources/apis/meshservice/vip/suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package vip_test

import (
"testing"

"github.com/kumahq/kuma/pkg/test"
)

func TestVIP(t *testing.T) {
test.RunSpecs(t, "MeshService VIP Suite")
}
Loading

0 comments on commit 3220769

Please sign in to comment.