Skip to content

Commit

Permalink
Merge pull request #2492 from crazy-max/k8s-timeout
Browse files Browse the repository at this point in the history
k8s: opt to customize timeout during deployment
  • Loading branch information
crazy-max authored Jun 3, 2024
2 parents bc83ecb + f30e143 commit 947d602
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 40 deletions.
31 changes: 19 additions & 12 deletions driver/kubernetes/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/docker/buildx/store"
"github.com/docker/buildx/util/platformutil"
"github.com/docker/buildx/util/progress"
"github.com/docker/go-units"
"github.com/moby/buildkit/client"
"github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -50,6 +51,7 @@ type Driver struct {
configMapClient clientcorev1.ConfigMapInterface
podChooser podchooser.PodChooser
defaultLoad bool
timeout time.Duration
}

func (d *Driver) IsMobyDriver() bool {
Expand Down Expand Up @@ -88,7 +90,7 @@ func (d *Driver) Bootstrap(ctx context.Context, l progress.Logger) error {
}
}
return sub.Wrap(
fmt.Sprintf("waiting for %d pods to be ready", d.minReplicas),
fmt.Sprintf("waiting for %d pods to be ready, timeout: %s", d.minReplicas, units.HumanDuration(d.timeout)),
func() error {
return d.wait(ctx)
})
Expand All @@ -101,22 +103,27 @@ func (d *Driver) wait(ctx context.Context) error {
err error
depl *appsv1.Deployment
)
for try := 0; try < 100; try++ {
depl, err = d.deploymentClient.Get(ctx, d.deployment.Name, metav1.GetOptions{})
if err == nil {
if depl.Status.ReadyReplicas >= int32(d.minReplicas) {
return nil
}
err = errors.Errorf("expected %d replicas to be ready, got %d",
d.minReplicas, depl.Status.ReadyReplicas)
}

timeoutChan := time.After(d.timeout)
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Duration(100+try*20) * time.Millisecond):
case <-timeoutChan:
return err
case <-ticker.C:
depl, err = d.deploymentClient.Get(ctx, d.deployment.Name, metav1.GetOptions{})
if err == nil {
if depl.Status.ReadyReplicas >= int32(d.minReplicas) {
return nil
}
err = errors.Errorf("expected %d replicas to be ready, got %d", d.minReplicas, depl.Status.ReadyReplicas)
}
}
}
return err
}

func (d *Driver) Info(ctx context.Context) (*driver.Info, error) {
Expand Down
43 changes: 27 additions & 16 deletions driver/kubernetes/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"strconv"
"strings"
"time"

corev1 "k8s.io/api/core/v1"

Expand All @@ -16,8 +17,11 @@ import (
"k8s.io/client-go/kubernetes"
)

const prioritySupported = 40
const priorityUnsupported = 80
const (
prioritySupported = 40
priorityUnsupported = 80
defaultTimeout = 120 * time.Second
)

func init() {
driver.Register(&factory{})
Expand Down Expand Up @@ -68,12 +72,13 @@ func (f *factory) New(ctx context.Context, cfg driver.InitConfig) (driver.Driver
clientset: clientset,
}

deploymentOpt, loadbalance, namespace, defaultLoad, err := f.processDriverOpts(deploymentName, namespace, cfg)
deploymentOpt, loadbalance, namespace, defaultLoad, timeout, err := f.processDriverOpts(deploymentName, namespace, cfg)
if nil != err {
return nil, err
}

d.defaultLoad = defaultLoad
d.timeout = timeout

d.deployment, d.configMaps, err = manifest.NewDeployment(deploymentOpt)
if err != nil {
Expand Down Expand Up @@ -102,7 +107,7 @@ func (f *factory) New(ctx context.Context, cfg driver.InitConfig) (driver.Driver
return d, nil
}

func (f *factory) processDriverOpts(deploymentName string, namespace string, cfg driver.InitConfig) (*manifest.DeploymentOpt, string, string, bool, error) {
func (f *factory) processDriverOpts(deploymentName string, namespace string, cfg driver.InitConfig) (*manifest.DeploymentOpt, string, string, bool, time.Duration, error) {
deploymentOpt := &manifest.DeploymentOpt{
Name: deploymentName,
Image: bkimage.DefaultImage,
Expand All @@ -114,6 +119,7 @@ func (f *factory) processDriverOpts(deploymentName string, namespace string, cfg
}

defaultLoad := false
timeout := defaultTimeout

deploymentOpt.Qemu.Image = bkimage.QemuImage

Expand All @@ -131,7 +137,7 @@ func (f *factory) processDriverOpts(deploymentName string, namespace string, cfg
case "replicas":
deploymentOpt.Replicas, err = strconv.Atoi(v)
if err != nil {
return nil, "", "", false, err
return nil, "", "", false, 0, err
}
case "requests.cpu":
deploymentOpt.RequestsCPU = v
Expand All @@ -148,7 +154,7 @@ func (f *factory) processDriverOpts(deploymentName string, namespace string, cfg
case "rootless":
deploymentOpt.Rootless, err = strconv.ParseBool(v)
if err != nil {
return nil, "", "", false, err
return nil, "", "", false, 0, err
}
if _, isImage := cfg.DriverOpts["image"]; !isImage {
deploymentOpt.Image = bkimage.DefaultRootlessImage
Expand All @@ -160,17 +166,17 @@ func (f *factory) processDriverOpts(deploymentName string, namespace string, cfg
case "nodeselector":
deploymentOpt.NodeSelector, err = splitMultiValues(v, ",", "=")
if err != nil {
return nil, "", "", false, errors.Wrap(err, "cannot parse node selector")
return nil, "", "", false, 0, errors.Wrap(err, "cannot parse node selector")
}
case "annotations":
deploymentOpt.CustomAnnotations, err = splitMultiValues(v, ",", "=")
if err != nil {
return nil, "", "", false, errors.Wrap(err, "cannot parse annotations")
return nil, "", "", false, 0, errors.Wrap(err, "cannot parse annotations")
}
case "labels":
deploymentOpt.CustomLabels, err = splitMultiValues(v, ",", "=")
if err != nil {
return nil, "", "", false, errors.Wrap(err, "cannot parse labels")
return nil, "", "", false, 0, errors.Wrap(err, "cannot parse labels")
}
case "tolerations":
ts := strings.Split(v, ";")
Expand All @@ -195,12 +201,12 @@ func (f *factory) processDriverOpts(deploymentName string, namespace string, cfg
case "tolerationSeconds":
c, err := strconv.Atoi(kv[1])
if nil != err {
return nil, "", "", false, err
return nil, "", "", false, 0, err
}
c64 := int64(c)
t.TolerationSeconds = &c64
default:
return nil, "", "", false, errors.Errorf("invalid tolaration %q", v)
return nil, "", "", false, 0, errors.Errorf("invalid tolaration %q", v)
}
}
}
Expand All @@ -212,13 +218,13 @@ func (f *factory) processDriverOpts(deploymentName string, namespace string, cfg
case LoadbalanceSticky:
case LoadbalanceRandom:
default:
return nil, "", "", false, errors.Errorf("invalid loadbalance %q", v)
return nil, "", "", false, 0, errors.Errorf("invalid loadbalance %q", v)
}
loadbalance = v
case "qemu.install":
deploymentOpt.Qemu.Install, err = strconv.ParseBool(v)
if err != nil {
return nil, "", "", false, err
return nil, "", "", false, 0, err
}
case "qemu.image":
if v != "" {
Expand All @@ -227,14 +233,19 @@ func (f *factory) processDriverOpts(deploymentName string, namespace string, cfg
case "default-load":
defaultLoad, err = strconv.ParseBool(v)
if err != nil {
return nil, "", "", false, err
return nil, "", "", false, 0, err
}
case "timeout":
timeout, err = time.ParseDuration(v)
if err != nil {
return nil, "", "", false, 0, errors.Wrap(err, "cannot parse timeout")
}
default:
return nil, "", "", false, errors.Errorf("invalid driver option %s for driver %s", k, DriverName)
return nil, "", "", false, 0, errors.Errorf("invalid driver option %s for driver %s", k, DriverName)
}
}

return deploymentOpt, loadbalance, namespace, defaultLoad, nil
return deploymentOpt, loadbalance, namespace, defaultLoad, timeout, nil
}

func splitMultiValues(in string, itemsep string, kvsep string) (map[string]string, error) {
Expand Down
39 changes: 27 additions & 12 deletions driver/kubernetes/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kubernetes

import (
"testing"
"time"

"github.com/docker/buildx/driver"
"github.com/docker/buildx/driver/bkimage"
Expand Down Expand Up @@ -40,6 +41,7 @@ func TestFactory_processDriverOpts(t *testing.T) {
"namespace": "test-ns",
"image": "test:latest",
"replicas": "2",
"timeout": "300s",
"requests.cpu": "100m",
"requests.memory": "32Mi",
"limits.cpu": "200m",
Expand All @@ -54,7 +56,7 @@ func TestFactory_processDriverOpts(t *testing.T) {
"qemu.image": "qemu:latest",
"default-load": "true",
}
r, loadbalance, ns, defaultLoad, err := f.processDriverOpts(cfg.Name, "test", cfg)
r, loadbalance, ns, defaultLoad, timeout, err := f.processDriverOpts(cfg.Name, "test", cfg)

nodeSelectors := map[string]string{
"selector1": "value1",
Expand Down Expand Up @@ -104,14 +106,15 @@ func TestFactory_processDriverOpts(t *testing.T) {
require.True(t, r.Qemu.Install)
require.Equal(t, "qemu:latest", r.Qemu.Image)
require.True(t, defaultLoad)
require.Equal(t, 300*time.Second, timeout)
},
)

t.Run(
"NoOptions", func(t *testing.T) {
cfg.DriverOpts = map[string]string{}

r, loadbalance, ns, defaultLoad, err := f.processDriverOpts(cfg.Name, "test", cfg)
r, loadbalance, ns, defaultLoad, timeout, err := f.processDriverOpts(cfg.Name, "test", cfg)

require.NoError(t, err)

Expand All @@ -131,6 +134,7 @@ func TestFactory_processDriverOpts(t *testing.T) {
require.False(t, r.Qemu.Install)
require.Equal(t, bkimage.QemuImage, r.Qemu.Image)
require.False(t, defaultLoad)
require.Equal(t, 120*time.Second, timeout)
},
)

Expand All @@ -141,7 +145,7 @@ func TestFactory_processDriverOpts(t *testing.T) {
"loadbalance": "sticky",
}

r, loadbalance, ns, defaultLoad, err := f.processDriverOpts(cfg.Name, "test", cfg)
r, loadbalance, ns, defaultLoad, timeout, err := f.processDriverOpts(cfg.Name, "test", cfg)

require.NoError(t, err)

Expand All @@ -161,6 +165,7 @@ func TestFactory_processDriverOpts(t *testing.T) {
require.False(t, r.Qemu.Install)
require.Equal(t, bkimage.QemuImage, r.Qemu.Image)
require.False(t, defaultLoad)
require.Equal(t, 120*time.Second, timeout)
},
)

Expand All @@ -169,7 +174,7 @@ func TestFactory_processDriverOpts(t *testing.T) {
cfg.DriverOpts = map[string]string{
"replicas": "invalid",
}
_, _, _, _, err := f.processDriverOpts(cfg.Name, "test", cfg)
_, _, _, _, _, err := f.processDriverOpts(cfg.Name, "test", cfg)
require.Error(t, err)
},
)
Expand All @@ -179,7 +184,7 @@ func TestFactory_processDriverOpts(t *testing.T) {
cfg.DriverOpts = map[string]string{
"rootless": "invalid",
}
_, _, _, _, err := f.processDriverOpts(cfg.Name, "test", cfg)
_, _, _, _, _, err := f.processDriverOpts(cfg.Name, "test", cfg)
require.Error(t, err)
},
)
Expand All @@ -189,7 +194,7 @@ func TestFactory_processDriverOpts(t *testing.T) {
cfg.DriverOpts = map[string]string{
"tolerations": "key=foo,value=bar,invalid=foo2",
}
_, _, _, _, err := f.processDriverOpts(cfg.Name, "test", cfg)
_, _, _, _, _, err := f.processDriverOpts(cfg.Name, "test", cfg)
require.Error(t, err)
},
)
Expand All @@ -199,7 +204,7 @@ func TestFactory_processDriverOpts(t *testing.T) {
cfg.DriverOpts = map[string]string{
"tolerations": "key=foo,value=bar,tolerationSeconds=invalid",
}
_, _, _, _, err := f.processDriverOpts(cfg.Name, "test", cfg)
_, _, _, _, _, err := f.processDriverOpts(cfg.Name, "test", cfg)
require.Error(t, err)
},
)
Expand All @@ -209,7 +214,7 @@ func TestFactory_processDriverOpts(t *testing.T) {
cfg.DriverOpts = map[string]string{
"annotations": "key,value",
}
_, _, _, _, err := f.processDriverOpts(cfg.Name, "test", cfg)
_, _, _, _, _, err := f.processDriverOpts(cfg.Name, "test", cfg)
require.Error(t, err)
},
)
Expand All @@ -219,7 +224,7 @@ func TestFactory_processDriverOpts(t *testing.T) {
cfg.DriverOpts = map[string]string{
"labels": "key=value=foo",
}
_, _, _, _, err := f.processDriverOpts(cfg.Name, "test", cfg)
_, _, _, _, _, err := f.processDriverOpts(cfg.Name, "test", cfg)
require.Error(t, err)
},
)
Expand All @@ -229,7 +234,7 @@ func TestFactory_processDriverOpts(t *testing.T) {
cfg.DriverOpts = map[string]string{
"loadbalance": "invalid",
}
_, _, _, _, err := f.processDriverOpts(cfg.Name, "test", cfg)
_, _, _, _, _, err := f.processDriverOpts(cfg.Name, "test", cfg)
require.Error(t, err)
},
)
Expand All @@ -239,7 +244,7 @@ func TestFactory_processDriverOpts(t *testing.T) {
cfg.DriverOpts = map[string]string{
"qemu.install": "invalid",
}
_, _, _, _, err := f.processDriverOpts(cfg.Name, "test", cfg)
_, _, _, _, _, err := f.processDriverOpts(cfg.Name, "test", cfg)
require.Error(t, err)
},
)
Expand All @@ -249,7 +254,17 @@ func TestFactory_processDriverOpts(t *testing.T) {
cfg.DriverOpts = map[string]string{
"invalid": "foo",
}
_, _, _, _, err := f.processDriverOpts(cfg.Name, "test", cfg)
_, _, _, _, _, err := f.processDriverOpts(cfg.Name, "test", cfg)
require.Error(t, err)
},
)

t.Run(
"InvalidTimeout", func(t *testing.T) {
cfg.DriverOpts = map[string]string{
"timeout": "invalid",
}
_, _, _, _, _, err := f.processDriverOpts(cfg.Name, "test", cfg)
require.Error(t, err)
},
)
Expand Down

0 comments on commit 947d602

Please sign in to comment.