Skip to content

Commit

Permalink
feat(kuma-dp): add a separate component to handle kuma-sidecar readin…
Browse files Browse the repository at this point in the history
…ess probes (#11107)

* feat(kuma-dp): add a readiness handler
* feat(kuma-dp): support TERMINATING state
* use 9902 to expose dp readiness
* support port 0 to disable the dp readiness and use Envoy
* support ipv6 address: reuse AdminAddress and make readiness reporter listen on it
* don't send 503 when receiving SIGUSR2

---------

Signed-off-by: Jay Chen <1180092+jijiechen@users.noreply.github.com>
  • Loading branch information
jijiechen authored and kumahq[bot] committed Aug 28, 2024
1 parent 7f3acae commit 5f19089
Show file tree
Hide file tree
Showing 28 changed files with 693 additions and 15 deletions.
11 changes: 11 additions & 0 deletions .github/workflows/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,14 @@ jobs:
target="test/e2e"
fi
make ${MAKE_PARAMETERS} CI=true "${target}"
<<<<<<< HEAD:.github/workflows/e2e.yaml
=======
- uses: actions/upload-artifact@834a144ee995460fba8ed112a2fc961b36a5ec5a # v4.3.6
if: always()
with:
name: e2e-debug-${{ env.E2E_PARAM_TARGET }}
if-no-files-found: ignore
path: |
/tmp/e2e-debug/
retention-days: ${{ github.event_name == 'pull_request' && 1 || 30 }}
>>>>>>> 20208eb60 (feat(kuma-dp): add a separate component to handle kuma-sidecar readiness probes (#11107)):.github/workflows/_e2e.yaml
57 changes: 57 additions & 0 deletions app/kuma-dp/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ import (
"github.com/kumahq/kuma/app/kuma-dp/pkg/dataplane/envoy"
"github.com/kumahq/kuma/app/kuma-dp/pkg/dataplane/meshmetrics"
"github.com/kumahq/kuma/app/kuma-dp/pkg/dataplane/metrics"
<<<<<<< HEAD
=======
"github.com/kumahq/kuma/app/kuma-dp/pkg/dataplane/probes"
"github.com/kumahq/kuma/app/kuma-dp/pkg/dataplane/readiness"
>>>>>>> 20208eb60 (feat(kuma-dp): add a separate component to handle kuma-sidecar readiness probes (#11107))
kuma_cmd "github.com/kumahq/kuma/pkg/cmd"
"github.com/kumahq/kuma/pkg/config"
kumadp "github.com/kumahq/kuma/pkg/config/app/kuma-dp"
Expand Down Expand Up @@ -176,6 +181,7 @@ func newRunCmd(opts kuma_cmd.RunCmdOpts, rootCtx *RootContext) *cobra.Command {

runLog.Info("generating bootstrap configuration")
bootstrap, kumaSidecarConfiguration, err := rootCtx.BootstrapGenerator(gracefulCtx, opts.Config.ControlPlane.URL, opts.Config, envoy.BootstrapParams{
<<<<<<< HEAD
Dataplane: opts.Dataplane,
DNSPort: cfg.DNS.EnvoyDNSPort,
EmptyDNSPort: cfg.DNS.CoreDNSEmptyPort,
Expand All @@ -186,6 +192,17 @@ func newRunCmd(opts kuma_cmd.RunCmdOpts, rootCtx *RootContext) *cobra.Command {
DynamicMetadata: rootCtx.BootstrapDynamicMetadata,
MetricsCertPath: cfg.DataplaneRuntime.Metrics.CertPath,
MetricsKeyPath: cfg.DataplaneRuntime.Metrics.KeyPath,
=======
Dataplane: opts.Dataplane,
DNSPort: cfg.DNS.EnvoyDNSPort,
ReadinessPort: cfg.Dataplane.ReadinessPort,
EnvoyVersion: *envoyVersion,
Workdir: cfg.DataplaneRuntime.SocketDir,
DynamicMetadata: rootCtx.BootstrapDynamicMetadata,
MetricsCertPath: cfg.DataplaneRuntime.Metrics.CertPath,
MetricsKeyPath: cfg.DataplaneRuntime.Metrics.KeyPath,
SystemCaPath: cfg.DataplaneRuntime.SystemCaPath,
>>>>>>> 20208eb60 (feat(kuma-dp): add a separate component to handle kuma-sidecar readiness probes (#11107))
})
if err != nil {
return errors.Errorf("Failed to generate Envoy bootstrap config. %v", err)
Expand Down Expand Up @@ -233,6 +250,15 @@ func newRunCmd(opts kuma_cmd.RunCmdOpts, rootCtx *RootContext) *cobra.Command {

observabilityComponents := setupObservability(kumaSidecarConfiguration, bootstrap, cfg)
components = append(components, observabilityComponents...)

var readinessReporter *readiness.Reporter
if cfg.Dataplane.ReadinessPort > 0 {
readinessReporter = readiness.NewReporter(
bootstrap.GetAdmin().GetAddress().GetSocketAddress().GetAddress(),
cfg.Dataplane.ReadinessPort)
components = append(components, readinessReporter)
}

if err := rootCtx.ComponentManager.Add(components...); err != nil {
return err
}
Expand All @@ -250,6 +276,37 @@ func newRunCmd(opts kuma_cmd.RunCmdOpts, rootCtx *RootContext) *cobra.Command {
case <-time.After(cfg.Dataplane.DrainTime.Duration):
case <-ctx.Done():
}
<<<<<<< HEAD
=======
if !draining {
runLog.Info("draining Envoy connections")
if err := envoyComponent.DrainForever(); err != nil {
runLog.Error(err, "could not drain connections")
}
}
draining = true
continue
case <-gracefulCtx.Done():
runLog.Info("Kuma DP caught an exit signal")
if draining {
runLog.Info("already drained, exit immediately")
} else {
if readinessReporter != nil {
readinessReporter.Terminating()
}
runLog.Info("draining Envoy connections")
if err := envoyComponent.FailHealthchecks(); err != nil {
runLog.Error(err, "could not drain connections")
} else {
runLog.Info("waiting for connections to be drained", "waitTime", cfg.Dataplane.DrainTime)
select {
case <-time.After(cfg.Dataplane.DrainTime.Duration):
case <-ctx.Done():
}
}
}
case <-componentCtx.Done():
>>>>>>> 20208eb60 (feat(kuma-dp): add a separate component to handle kuma-sidecar readiness probes (#11107))
}
case <-componentCtx.Done():
}
Expand Down
12 changes: 12 additions & 0 deletions app/kuma-dp/pkg/dataplane/envoy/envoy.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
var runLog = core.Log.WithName("kuma-dp").WithName("run").WithName("envoy")

type BootstrapParams struct {
<<<<<<< HEAD
Dataplane rest.Resource
DNSPort uint32
EmptyDNSPort uint32
Expand All @@ -38,6 +39,17 @@ type BootstrapParams struct {
AccessLogSocketPath string
MetricsCertPath string
MetricsKeyPath string
=======
Dataplane rest.Resource
DNSPort uint32
ReadinessPort uint32
EnvoyVersion EnvoyVersion
DynamicMetadata map[string]string
Workdir string
MetricsCertPath string
MetricsKeyPath string
SystemCaPath string
>>>>>>> 20208eb60 (feat(kuma-dp): add a separate component to handle kuma-sidecar readiness probes (#11107))
}

type BootstrapConfigFactoryFunc func(ctx context.Context, url string, cfg kuma_dp.Config, params BootstrapParams) (*envoy_bootstrap_v3.Bootstrap, *types.KumaSidecarConfiguration, error)
Expand Down
10 changes: 10 additions & 0 deletions app/kuma-dp/pkg/dataplane/envoy/remote_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func (b *remoteBootstrap) requestForBootstrap(ctx context.Context, client *http.
KumaDpCompatible: params.EnvoyVersion.KumaDpCompatible,
},
},
<<<<<<< HEAD
DynamicMetadata: params.DynamicMetadata,
DNSPort: params.DNSPort,
EmptyDNSPort: params.EmptyDNSPort,
Expand All @@ -185,6 +186,15 @@ func (b *remoteBootstrap) requestForBootstrap(ctx context.Context, client *http.
Resources: resources,
Workdir: params.Workdir,
AccessLogSocketPath: params.AccessLogSocketPath,
=======
DynamicMetadata: params.DynamicMetadata,
DNSPort: params.DNSPort,
ReadinessPort: params.ReadinessPort,
OperatingSystem: b.operatingSystem,
Features: b.features,
Resources: resources,
Workdir: params.Workdir,
>>>>>>> 20208eb60 (feat(kuma-dp): add a separate component to handle kuma-sidecar readiness probes (#11107))
MetricsResources: types.MetricsResources{
SocketPath: params.MetricsSocketPath,
CertPath: params.MetricsCertPath,
Expand Down
111 changes: 111 additions & 0 deletions app/kuma-dp/pkg/dataplane/readiness/component.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package readiness

import (
"context"
"fmt"
"net"
"net/http"
"sync/atomic"
"time"

"github.com/asaskevich/govalidator"
"github.com/bakito/go-log-logr-adapter/adapter"

"github.com/kumahq/kuma/pkg/core"
"github.com/kumahq/kuma/pkg/core/runtime/component"
)

const (
pathPrefixReady = "/ready"
stateReady = "READY"
stateTerminating = "TERMINATING"
)

// Reporter reports the health status of this Kuma Dataplane Proxy
type Reporter struct {
localListenAddr string
localListenPort uint32
isTerminating atomic.Bool
}

var logger = core.Log.WithName("readiness")

func NewReporter(localIPAddr string, localListenPort uint32) *Reporter {
return &Reporter{
localListenPort: localListenPort,
localListenAddr: localIPAddr,
}
}

func (r *Reporter) Start(stop <-chan struct{}) error {
protocol := "tcp"
addr := r.localListenAddr
if govalidator.IsIPv6(addr) {
protocol = "tcp6"
addr = fmt.Sprintf("[%s]", addr)
}
lis, err := net.Listen(protocol, fmt.Sprintf("%s:%d", addr, r.localListenPort))
if err != nil {
return err
}

defer func() {
_ = lis.Close()
}()

logger.Info("starting readiness reporter", "addr", lis.Addr().String())

mux := http.NewServeMux()
mux.HandleFunc(pathPrefixReady, r.handleReadiness)
server := &http.Server{
ReadHeaderTimeout: time.Second,
Handler: mux,
ErrorLog: adapter.ToStd(logger),
}

errCh := make(chan error)
go func() {
if err := server.Serve(lis); err != nil {
errCh <- err
}
}()

select {
case err := <-errCh:
return err
case <-stop:
logger.Info("stopping readiness reporter")
return server.Shutdown(context.Background())
}
}

func (r *Reporter) Terminating() {
r.isTerminating.Store(true)
}

func (r *Reporter) handleReadiness(writer http.ResponseWriter, req *http.Request) {
state := stateReady
stateHTTPStatus := http.StatusOK
if r.isTerminating.Load() {
state = stateTerminating
stateHTTPStatus = http.StatusServiceUnavailable
}

stateBytes := []byte(state)
writer.Header().Set("content-type", "text/plain")
writer.Header().Set("content-length", fmt.Sprintf("%d", len(stateBytes)))
writer.Header().Set("cache-control", "no-cache, max-age=0")
writer.Header().Set("x-powered-by", "kuma-dp")
writer.WriteHeader(stateHTTPStatus)
_, err := writer.Write(stateBytes)
logger.V(1).Info("responding readiness state", "state", state, "client", req.RemoteAddr)
if err != nil {
logger.Info("[WARNING] could not write response", "err", err)
}
}

func (r *Reporter) NeedLeaderElection() bool {
return false
}

var _ component.Component = &Reporter{}
4 changes: 4 additions & 0 deletions mk/k3d.mk
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ PORT_PREFIX := $$(($(patsubst 300-%,300+%-1,$(KIND_CLUSTER_NAME:kuma%=300%))))
K3D_NETWORK_CNI ?= flannel
K3D_CLUSTER_CREATE_OPTS ?= -i rancher/k3s:$(CI_K3S_VERSION) \
--k3s-arg '--disable=traefik@server:0' \
<<<<<<< HEAD
--k3s-arg '--disable=metrics-server@server:0' \
=======
--k3s-arg '--kubelet-arg=image-gc-high-threshold=100@server:0' \
>>>>>>> 20208eb60 (feat(kuma-dp): add a separate component to handle kuma-sidecar readiness probes (#11107))
--k3s-arg '--disable=servicelb@server:0' \
--volume '$(subst @,\@,$(TOP)/$(KUMA_DIR))/test/framework/deployments:/tmp/deployments@server:0' \
--network kind \
Expand Down
15 changes: 11 additions & 4 deletions pkg/config/app/kuma-dp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ var DefaultConfig = func() Config {
},
},
Dataplane: Dataplane{
Mesh: "",
Name: "", // Dataplane name must be set explicitly
DrainTime: config_types.Duration{Duration: 30 * time.Second},
ProxyType: "dataplane",
Mesh: "",
Name: "", // Dataplane name must be set explicitly
DrainTime: config_types.Duration{Duration: 30 * time.Second},
ProxyType: "dataplane",
ReadinessPort: 9902,
},
DataplaneRuntime: DataplaneRuntime{
BinaryPath: "envoy",
Expand Down Expand Up @@ -133,6 +134,8 @@ type Dataplane struct {
ProxyType string `json:"proxyType,omitempty" envconfig:"kuma_dataplane_proxy_type"`
// Drain time for listeners.
DrainTime config_types.Duration `json:"drainTime,omitempty" envconfig:"kuma_dataplane_drain_time"`
// Port that exposes kuma-dp readiness status on localhost, set this value to 0 to provide readiness by "/ready" endpoint from Envoy adminAPI
ReadinessPort uint32 `json:"readinessPort,omitempty" envconfig:"kuma_readiness_port"`
}

func (d *Dataplane) PostProcess() error {
Expand Down Expand Up @@ -304,6 +307,10 @@ func (d *Dataplane) Validate() error {
errs = multierr.Append(errs, errors.Errorf(".DrainTime must be positive"))
}

if d.ReadinessPort > 65353 {
return errors.New(".ReadinessPort has to be in [0, 65353] range")
}

return errs
}

Expand Down
1 change: 1 addition & 0 deletions pkg/config/app/kuma-dp/testdata/default-config.golden.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ controlPlane:
dataplane:
drainTime: 30s
proxyType: dataplane
readinessPort: 9902
dataplaneRuntime:
binaryPath: envoy
dynamicConfiguration:
Expand Down
25 changes: 25 additions & 0 deletions pkg/core/xds/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
// Supported Envoy node metadata fields.
FieldDataplaneAdminPort = "dataplane.admin.port"
FieldDataplaneAdminAddress = "dataplane.admin.address"
FieldDataplaneReadinessPort = "dataplane.readinessReporter.port"
FieldDataplaneDNSPort = "dataplane.dns.port"
FieldDataplaneDNSEmptyPort = "dataplane.dns.empty.port"
FieldDataplaneDataplaneResource = "dataplane.resource"
Expand Down Expand Up @@ -51,6 +52,7 @@ const (
// This way, xDS server will be able to use Envoy node metadata
// to generate xDS resources that depend on environment-specific configuration.
type DataplaneMetadata struct {
<<<<<<< HEAD
Resource model.Resource
AdminPort uint32
AdminAddress string
Expand All @@ -65,6 +67,21 @@ type DataplaneMetadata struct {
MetricsSocketPath string
MetricsCertPath string
MetricsKeyPath string
=======
Resource model.Resource
AdminPort uint32
AdminAddress string
ReadinessPort uint32
DNSPort uint32
DynamicMetadata map[string]string
ProxyType mesh_proto.ProxyType
Version *mesh_proto.Version
Features Features
WorkDir string
MetricsCertPath string
MetricsKeyPath string
SystemCaPath string
>>>>>>> 20208eb60 (feat(kuma-dp): add a separate component to handle kuma-sidecar readiness probes (#11107))
}

// GetDataplaneResource returns the underlying DataplaneResource, if present.
Expand Down Expand Up @@ -117,6 +134,13 @@ func (m *DataplaneMetadata) GetAdminPort() uint32 {
return m.AdminPort
}

func (m *DataplaneMetadata) GetReadinessPort() uint32 {
if m == nil {
return 0
}
return m.ReadinessPort
}

func (m *DataplaneMetadata) GetAdminAddress() string {
if m == nil {
return ""
Expand Down Expand Up @@ -165,6 +189,7 @@ func DataplaneMetadataFromXdsMetadata(xdsMetadata *structpb.Struct, tmpDir strin
}
metadata.AdminPort = uint32Metadata(xdsMetadata, FieldDataplaneAdminPort)
metadata.AdminAddress = xdsMetadata.Fields[FieldDataplaneAdminAddress].GetStringValue()
metadata.ReadinessPort = uint32Metadata(xdsMetadata, FieldDataplaneReadinessPort)
metadata.DNSPort = uint32Metadata(xdsMetadata, FieldDataplaneDNSPort)
metadata.EmptyDNSPort = uint32Metadata(xdsMetadata, FieldDataplaneDNSEmptyPort)
if value := xdsMetadata.Fields[FieldDataplaneDataplaneResource]; value != nil {
Expand Down
Loading

0 comments on commit 5f19089

Please sign in to comment.