Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exposes the endpointSliceWatchTimeout value #53

Merged
merged 8 commits into from
Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions deployments/helm/hephaestus/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ controller:
# Defaults to "10m"
poolMaxIdleTime: null

# Duration after which buildkit statefulset scale watch will be terminated
# Defaults to "60s"
poolWatchTimeout: null
# Duration the worker pool will wait for a buildkit pod to become ready for traffic
# Defaults to "180s"
poolEndpointWatchTimeout: null

# Secrets (name: path) to expose into builds that request it
secrets: {}
Expand Down
60 changes: 30 additions & 30 deletions pkg/buildkit/worker/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ type Pool interface {
var (
ErrNoUnleasedPods = errors.New("no unleased pods found")

newUUID = uuid.NewUUID
statefulPodRegex = regexp.MustCompile(`^.*-(\d+)$`)
endpointSliceWatchTimeout = pointer.Int64(90)
newUUID = uuid.NewUUID
statefulPodRegex = regexp.MustCompile(`^.*-(\d+)$`)
)

const (
Expand Down Expand Up @@ -74,8 +73,9 @@ type workerPool struct {
podClient corev1typed.PodInterface
endpointSliceClient discoveryv1beta1typed.EndpointSliceInterface

podListOptions metav1.ListOptions
endpointSliceListOptions metav1.ListOptions
podListOptions metav1.ListOptions
endpointSliceListOptions metav1.ListOptions
endpointSliceWatchTimeout int64

// endpoints discovery
serviceName string
Expand All @@ -101,23 +101,24 @@ func NewPool(ctx context.Context, clientset kubernetes.Interface, conf config.Bu

ctx, cancel := context.WithCancel(ctx)
wp := &workerPool{
ctx: ctx,
cancel: cancel,
log: o.Log,
poolSyncTime: o.SyncWaitTime,
podMaxIdleTime: o.MaxIdleTime,
uuid: string(newUUID()),
requests: NewRequestQueue(),
notifyReconcile: make(chan struct{}, 1),
podClient: clientset.CoreV1().Pods(conf.Namespace),
endpointSliceClient: clientset.DiscoveryV1beta1().EndpointSlices(conf.Namespace),
podListOptions: podListOptions,
endpointSliceListOptions: endpointSliceListOptions,
serviceName: conf.ServiceName,
servicePort: conf.DaemonPort,
statefulSetName: conf.StatefulSetName,
statefulSetClient: clientset.AppsV1().StatefulSets(conf.Namespace),
namespace: conf.Namespace,
ctx: ctx,
cancel: cancel,
log: o.Log,
poolSyncTime: o.SyncWaitTime,
podMaxIdleTime: o.MaxIdleTime,
endpointSliceWatchTimeout: o.EndpointWatchTimeoutSeconds,
uuid: string(newUUID()),
requests: NewRequestQueue(),
notifyReconcile: make(chan struct{}, 1),
podClient: clientset.CoreV1().Pods(conf.Namespace),
endpointSliceClient: clientset.DiscoveryV1beta1().EndpointSlices(conf.Namespace),
podListOptions: podListOptions,
endpointSliceListOptions: endpointSliceListOptions,
serviceName: conf.ServiceName,
servicePort: conf.DaemonPort,
statefulSetName: conf.StatefulSetName,
statefulSetClient: clientset.AppsV1().StatefulSets(conf.Namespace),
namespace: conf.Namespace,
}

wp.log.Info("Starting worker pod monitor", "syncTime", wp.poolSyncTime.String())
Expand Down Expand Up @@ -268,7 +269,7 @@ func (p *workerPool) buildEndpointURL(ctx context.Context, podName string) (stri

watchOpts := metav1.ListOptions{
LabelSelector: p.endpointSliceListOptions.LabelSelector,
TimeoutSeconds: endpointSliceWatchTimeout,
TimeoutSeconds: &p.endpointSliceWatchTimeout,
}
watcher, err := p.endpointSliceClient.Watch(ctx, watchOpts)
if err != nil {
Expand All @@ -289,7 +290,7 @@ func (p *workerPool) buildEndpointURL(ctx context.Context, podName string) (stri
p.log.Info("Finished watching endpoints", "podName", podName, "duration", time.Since(start))

if hostname == "" {
return "", fmt.Errorf("failed to extract hostname after %d seconds", *endpointSliceWatchTimeout)
return "", fmt.Errorf("failed to extract hostname after %d seconds", p.endpointSliceWatchTimeout)
}

u, err := url.ParseRequestURI(fmt.Sprintf("tcp://%s:%d", hostname, p.servicePort))
Expand Down Expand Up @@ -375,8 +376,7 @@ func (p *workerPool) updateWorkers(ctx context.Context) error {
if req := p.requests.Dequeue(); req != nil {
log.Info("Found pending pod request, processing")

pod := pod
if p.processPodRequest(ctx, req, &pod) {
if p.processPodRequest(ctx, req, pod) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just passing the pod value instead of a pointer to avoid implicit memory aliasing and variable re-declaration.

leased = append(leased, pod.Name)
}
continue
Expand Down Expand Up @@ -449,11 +449,11 @@ func (p *workerPool) updateWorkers(ctx context.Context) error {
}

// attempts to lease a pod, build and endpoint url, and provide a request result
func (p *workerPool) processPodRequest(ctx context.Context, req *PodRequest, pod *corev1.Pod) (success bool) {
func (p *workerPool) processPodRequest(ctx context.Context, req *PodRequest, pod corev1.Pod) (success bool) {
log := p.log.WithValues("podName", pod.Name)

log.Info("Attempting to lease pod")
if err := p.leasePod(ctx, pod, req.owner); err != nil {
if err := p.leasePod(ctx, &pod, req.owner); err != nil {
log.Error(err, "Failed to lease pod")

req.result <- PodRequestResult{err: err}
Expand All @@ -466,8 +466,8 @@ func (p *workerPool) processPodRequest(ctx context.Context, req *PodRequest, pod
if err != nil {
log.Error(err, "Failed to build routable URL")

if rErr := p.releasePod(ctx, pod); rErr != nil {
log.Error(err, "Failed to release pod")
if rErr := p.releasePod(ctx, &pod); rErr != nil {
log.Error(rErr, "Failed to release pod")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was logging the wrong error.

}

req.result <- PodRequestResult{err: err}
Expand Down
20 changes: 10 additions & 10 deletions pkg/buildkit/worker/pooloption.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@ import (
)

var defaultOpts = Options{
Log: logr.Discard(),
SyncWaitTime: 30 * time.Second,
MaxIdleTime: 10 * time.Minute,
WatchTimeoutSeconds: 60,
Log: logr.Discard(),
SyncWaitTime: 30 * time.Second,
MaxIdleTime: 10 * time.Minute,
EndpointWatchTimeoutSeconds: 180,
}

type Options struct {
Log logr.Logger
MaxIdleTime time.Duration
SyncWaitTime time.Duration
WatchTimeoutSeconds int64
Log logr.Logger
MaxIdleTime time.Duration
SyncWaitTime time.Duration
EndpointWatchTimeoutSeconds int64
}

type PoolOption func(o Options) Options
Expand All @@ -36,9 +36,9 @@ func MaxIdleTime(d time.Duration) PoolOption {
}
}

func WatchTimeoutSeconds(s int64) PoolOption {
func EndpointWatchTimeoutSeconds(s int64) PoolOption {
return func(o Options) Options {
o.WatchTimeoutSeconds = s
o.EndpointWatchTimeoutSeconds = s
return o
}
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/buildkit/worker/pooloption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (

func TestPoolOptions(t *testing.T) {
opts := Options{
Log: logr.Logger{},
MaxIdleTime: 0,
SyncWaitTime: 0,
WatchTimeoutSeconds: 0,
Log: logr.Logger{},
MaxIdleTime: 0,
SyncWaitTime: 0,
EndpointWatchTimeoutSeconds: 0,
}

opts = SyncWaitTime(10 * time.Minute)(opts)
Expand All @@ -22,8 +22,8 @@ func TestPoolOptions(t *testing.T) {
opts = MaxIdleTime(30 * time.Minute)(opts)
assert.Equal(t, 30*time.Minute, opts.MaxIdleTime)

opts = WatchTimeoutSeconds(300)(opts)
assert.Equal(t, int64(300), opts.WatchTimeoutSeconds)
opts = EndpointWatchTimeoutSeconds(300)(opts)
assert.Equal(t, int64(300), opts.EndpointWatchTimeoutSeconds)

opts = Logger(logr.Discard())(opts)
assert.Equal(t, logr.Discard(), opts.Log)
Expand Down
40 changes: 26 additions & 14 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,29 +84,41 @@ type Manager struct {
EnableLeaderElection bool `json:"enableLeaderElection" yaml:"enableLeaderElection"`
}

// Buildkit communication and discovery configuration.
type Buildkit struct {
Namespace string `json:"namespace" yaml:"namespace"`
PodLabels map[string]string `json:"podLabels" yaml:"podLabels"`
DaemonPort int32 `json:"daemonPort" yaml:"daemonPort"`
ServiceName string `json:"serviceName" yaml:"serviceName"`
StatefulSetName string `json:"statefulSetName" yaml:"statefulSetName"`

Secrets map[string]string `json:"secrets" yaml:"secrets,omitempty"`

Registries map[string]RegistryConfig `json:"registries,omitempty" yaml:"registries,omitempty"`

// Namespace where the StatefulSet is deployed.
Namespace string `json:"namespace" yaml:"namespace"`
// PodLabels assigned to pods by the StatefulSet.
PodLabels map[string]string `json:"podLabels" yaml:"podLabels"`
// DaemonPort used to communicate with buildkitd over gRPC.
DaemonPort int32 `json:"daemonPort" yaml:"daemonPort"`
// ServiceName for the headless service.
ServiceName string `json:"serviceName" yaml:"serviceName"`
// StatefulSetName for the supervising workload.
StatefulSetName string `json:"statefulSetName" yaml:"statefulSetName"`
// PoolSyncWaitTime controls how often the worker pool is reconciled.
PoolSyncWaitTime *time.Duration `json:"poolSyncWaitTime" yaml:"poolSyncWaitTime"`
PoolMaxIdleTime *time.Duration `json:"poolMaxIdleTime" yaml:"poolMaxIdleTime"`
PoolWatchTimeout *int64 `json:"poolWatchTimeout" yaml:"poolWatchTimeout"`

// PoolMaxIdleTime controls how long a pod will be allowed to remain unleased before it's terminated.
PoolMaxIdleTime *time.Duration `json:"poolMaxIdleTime" yaml:"poolMaxIdleTime"`
// PoolEndpointWatchTimeout is the time limit used when waiting for new pods to become "ready" for traffic.
PoolEndpointWatchTimeout *int64 `json:"poolEndpointWatchTimeout" yaml:"poolEndpointWatchTimeout"`
// MTLS parameters.
MTLS *BuildkitMTLS `json:"mtls,omitempty" yaml:"mtls,omitempty"`
// Secrets provided to buildkitd during the build process.
Secrets map[string]string `json:"secrets" yaml:"secrets,omitempty"`
// Registries parameters.
Registries map[string]RegistryConfig `json:"registries,omitempty" yaml:"registries,omitempty"`
}

// RegistryConfig options used to relax registry push/pull restrictions.
type RegistryConfig struct {
// Insecure will allow self-signed certificates.
Insecure bool `json:"insecure,omitempty" yaml:"insecure,omitempty"`
HTTP bool `json:"http,omitempty" yaml:"http,omitempty"`
// HTTP will allow non-TLS connections.
HTTP bool `json:"http,omitempty" yaml:"http,omitempty"`
}

// BuildkitMTLS server configuration.
type BuildkitMTLS struct {
CACertPath string `json:"caCertPath" yaml:"caCertPath"`
CertPath string `json:"certPath" yaml:"certPath"`
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ func createWorkerPool(
poolOpts = append(poolOpts, worker.SyncWaitTime(*swt))
}

if wt := cfg.PoolWatchTimeout; wt != nil {
poolOpts = append(poolOpts, worker.WatchTimeoutSeconds(*wt))
if wt := cfg.PoolEndpointWatchTimeout; wt != nil {
poolOpts = append(poolOpts, worker.EndpointWatchTimeoutSeconds(*wt))
}

clientset, err := kubernetes.Clientset(mgr.GetConfig())
Expand Down