diff --git a/apis/config/v1alpha2/configuration_types.go b/apis/config/v1alpha2/configuration_types.go index 6ca46c6e0b..fedb2c494d 100644 --- a/apis/config/v1alpha2/configuration_types.go +++ b/apis/config/v1alpha2/configuration_types.go @@ -51,6 +51,10 @@ type Configuration struct { // This is achieved by blocking the start of new jobs until the previously // started job has all pods running (ready). WaitForPodsReady *WaitForPodsReady `json:"waitForPodsReady,omitempty"` + + // ClientConnection provides additional configuration options for Kubernetes + // API server client. + ClientConnection *ClientConnection `json:"clientConnection,omitempty"` } type WaitForPodsReady struct { @@ -77,3 +81,12 @@ type InternalCertManagement struct { // Defaults to kueue-webhook-server-cert. WebhookSecretName *string `json:"webhookSecretName,omitempty"` } + +type ClientConnection struct { + // QPS controls the number of queries per second allowed for K8S api server + // connection. + QPS *float32 `json:"qps,omitempty"` + + // Burst allows extra queries to accumulate when a client is exceeding its rate. + Burst *int32 `json:"burst,omitempty"` +} diff --git a/apis/config/v1alpha2/defaults.go b/apis/config/v1alpha2/defaults.go index e3c2b45c44..e7f54748f5 100644 --- a/apis/config/v1alpha2/defaults.go +++ b/apis/config/v1alpha2/defaults.go @@ -29,6 +29,8 @@ const ( DefaultHealthProbeBindAddress = ":8081" DefaultMetricsBindAddress = ":8080" DefaultLeaderElectionID = "c1f6bfd2.kueue.x-k8s.io" + DefaultClientConnectionQPS = 20.0 + DefaultClientConnectionBurst = 30 ) func addDefaultingFuncs(scheme *runtime.Scheme) error { @@ -70,4 +72,13 @@ func SetDefaults_Configuration(cfg *Configuration) { cfg.InternalCertManagement.WebhookSecretName = pointer.String(DefaultWebhookSecretName) } } + if cfg.ClientConnection == nil { + cfg.ClientConnection = &ClientConnection{} + } + if cfg.ClientConnection.QPS == nil { + cfg.ClientConnection.QPS = pointer.Float32(DefaultClientConnectionQPS) + } + if cfg.ClientConnection.Burst == nil { + cfg.ClientConnection.Burst = pointer.Int32(DefaultClientConnectionBurst) + } } diff --git a/apis/config/v1alpha2/defaults_test.go b/apis/config/v1alpha2/defaults_test.go index f7922ad6a2..34c4ee8a92 100644 --- a/apis/config/v1alpha2/defaults_test.go +++ b/apis/config/v1alpha2/defaults_test.go @@ -45,6 +45,10 @@ func TestSetDefaults_Configuration(t *testing.T) { HealthProbeBindAddress: DefaultHealthProbeBindAddress, }, } + defaultClientConnection := &ClientConnection{ + QPS: pointer.Float32(DefaultClientConnectionQPS), + Burst: pointer.Int32(DefaultClientConnectionBurst), + } testCases := map[string]struct { original *Configuration @@ -62,6 +66,7 @@ func TestSetDefaults_Configuration(t *testing.T) { InternalCertManagement: &InternalCertManagement{ Enable: pointer.Bool(false), }, + ClientConnection: defaultClientConnection, }, }, "defaulting ControllerManagerConfigurationSpec": { @@ -95,6 +100,7 @@ func TestSetDefaults_Configuration(t *testing.T) { InternalCertManagement: &InternalCertManagement{ Enable: pointer.Bool(false), }, + ClientConnection: defaultClientConnection, }, }, "should not default ControllerManagerConfigurationSpec": { @@ -138,6 +144,7 @@ func TestSetDefaults_Configuration(t *testing.T) { InternalCertManagement: &InternalCertManagement{ Enable: pointer.Bool(false), }, + ClientConnection: defaultClientConnection, }, }, "should not set LeaderElectionID": { @@ -170,6 +177,7 @@ func TestSetDefaults_Configuration(t *testing.T) { InternalCertManagement: &InternalCertManagement{ Enable: pointer.Bool(false), }, + ClientConnection: defaultClientConnection, }, }, "defaulting InternalCertManagement": { @@ -184,6 +192,7 @@ func TestSetDefaults_Configuration(t *testing.T) { WebhookServiceName: pointer.String(DefaultWebhookServiceName), WebhookSecretName: pointer.String(DefaultWebhookSecretName), }, + ClientConnection: defaultClientConnection, }, }, "should not default InternalCertManagement": { @@ -199,6 +208,47 @@ func TestSetDefaults_Configuration(t *testing.T) { InternalCertManagement: &InternalCertManagement{ Enable: pointer.Bool(false), }, + ClientConnection: defaultClientConnection, + }, + }, + "should not default values in custom ClientConnection": { + original: &Configuration{ + Namespace: pointer.String(overwriteNamespace), + InternalCertManagement: &InternalCertManagement{ + Enable: pointer.Bool(false), + }, + ClientConnection: &ClientConnection{ + QPS: pointer.Float32(123.0), + Burst: pointer.Int32(456), + }, + }, + want: &Configuration{ + Namespace: pointer.String(overwriteNamespace), + ControllerManagerConfigurationSpec: defaultCtrlManagerConfigurationSpec, + InternalCertManagement: &InternalCertManagement{ + Enable: pointer.Bool(false), + }, + ClientConnection: &ClientConnection{ + QPS: pointer.Float32(123.0), + Burst: pointer.Int32(456), + }, + }, + }, + "should default empty custom ClientConnection": { + original: &Configuration{ + Namespace: pointer.String(overwriteNamespace), + InternalCertManagement: &InternalCertManagement{ + Enable: pointer.Bool(false), + }, + ClientConnection: &ClientConnection{}, + }, + want: &Configuration{ + Namespace: pointer.String(overwriteNamespace), + ControllerManagerConfigurationSpec: defaultCtrlManagerConfigurationSpec, + InternalCertManagement: &InternalCertManagement{ + Enable: pointer.Bool(false), + }, + ClientConnection: defaultClientConnection, }, }, } diff --git a/apis/config/v1alpha2/zz_generated.deepcopy.go b/apis/config/v1alpha2/zz_generated.deepcopy.go index 999e0202e2..d2939b43f8 100644 --- a/apis/config/v1alpha2/zz_generated.deepcopy.go +++ b/apis/config/v1alpha2/zz_generated.deepcopy.go @@ -25,6 +25,31 @@ import ( "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClientConnection) DeepCopyInto(out *ClientConnection) { + *out = *in + if in.QPS != nil { + in, out := &in.QPS, &out.QPS + *out = new(float32) + **out = **in + } + if in.Burst != nil { + in, out := &in.Burst, &out.Burst + *out = new(int32) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClientConnection. +func (in *ClientConnection) DeepCopy() *ClientConnection { + if in == nil { + return nil + } + out := new(ClientConnection) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Configuration) DeepCopyInto(out *Configuration) { *out = *in @@ -45,6 +70,11 @@ func (in *Configuration) DeepCopyInto(out *Configuration) { *out = new(WaitForPodsReady) **out = **in } + if in.ClientConnection != nil { + in, out := &in.ClientConnection, &out.ClientConnection + *out = new(ClientConnection) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Configuration. diff --git a/config/components/manager/controller_manager_config.yaml b/config/components/manager/controller_manager_config.yaml index c7ce397522..99fc378835 100644 --- a/config/components/manager/controller_manager_config.yaml +++ b/config/components/manager/controller_manager_config.yaml @@ -16,6 +16,9 @@ controller: ClusterQueue.kueue.x-k8s.io: 3 ResourceFlavor.kueue.x-k8s.io: 3 Workload.kueue.x-k8s.io: 3 +clientConnection: + qps: 50 + burst: 100 #waitForPodsReady: # enable: true #manageJobsWithoutQueueName: true diff --git a/main.go b/main.go index 1fac264d9e..84be2143a5 100644 --- a/main.go +++ b/main.go @@ -92,7 +92,9 @@ func main() { if kubeConfig.UserAgent == "" { kubeConfig.UserAgent = useragent.Default() } - + kubeConfig.QPS = *cfg.ClientConnection.QPS + kubeConfig.Burst = int(*cfg.ClientConnection.Burst) + setupLog.V(2).Info("K8S Client", "qps", kubeConfig.QPS, "burst", kubeConfig.Burst) mgr, err := ctrl.NewManager(kubeConfig, options) if err != nil { setupLog.Error(err, "Unable to start manager") diff --git a/main_test.go b/main_test.go index 394aa21d68..71d32ac969 100644 --- a/main_test.go +++ b/main_test.go @@ -153,6 +153,27 @@ webhook: t.Fatal(err) } + clientConnectionConfig := filepath.Join(tmpDir, "clientConnection.yaml") + if err := os.WriteFile(clientConnectionConfig, []byte(` +apiVersion: config.kueue.x-k8s.io/v1alpha2 +kind: Configuration +namespace: kueue-system +health: + healthProbeBindAddress: :8081 +metrics: + bindAddress: :8080 +leaderElection: + leaderElect: true + resourceName: c1f6bfd2.kueue.x-k8s.io +webhook: + port: 9443 +clientConnection: + qps: 50 + burst: 100 +`), os.FileMode(0600)); err != nil { + t.Fatal(err) + } + defaultControlOptions := ctrl.Options{ Port: config.DefaultWebhookPort, HealthProbeBindAddress: config.DefaultHealthProbeBindAddress, @@ -176,6 +197,11 @@ webhook: cmpopts.IgnoreFields(config.Configuration{}, "ControllerManagerConfigurationSpec"), } + defaultClientConnection := &config.ClientConnection{ + QPS: pointer.Float32(config.DefaultClientConnectionQPS), + Burst: pointer.Int32(config.DefaultClientConnectionBurst), + } + testcases := []struct { name string configFile string @@ -188,6 +214,7 @@ webhook: wantConfiguration: config.Configuration{ Namespace: pointer.String(config.DefaultNamespace), InternalCertManagement: enableDefaultInternalCertManagement, + ClientConnection: defaultClientConnection, }, wantOptions: ctrl.Options{ Port: config.DefaultWebhookPort, @@ -208,6 +235,7 @@ webhook: Namespace: pointer.String("kueue-tenant-a"), ManageJobsWithoutQueueName: false, InternalCertManagement: enableDefaultInternalCertManagement, + ClientConnection: defaultClientConnection, }, wantOptions: defaultControlOptions, }, @@ -222,6 +250,7 @@ webhook: Namespace: pointer.String(config.DefaultNamespace), ManageJobsWithoutQueueName: false, InternalCertManagement: enableDefaultInternalCertManagement, + ClientConnection: defaultClientConnection, }, wantOptions: ctrl.Options{ HealthProbeBindAddress: ":38081", @@ -246,6 +275,7 @@ webhook: WebhookServiceName: pointer.String("kueue-tenant-a-webhook-service"), WebhookSecretName: pointer.String("kueue-tenant-a-webhook-server-cert"), }, + ClientConnection: defaultClientConnection, }, wantOptions: defaultControlOptions, }, @@ -262,6 +292,7 @@ webhook: InternalCertManagement: &config.InternalCertManagement{ Enable: pointer.Bool(false), }, + ClientConnection: defaultClientConnection, }, wantOptions: defaultControlOptions, }, @@ -276,6 +307,7 @@ webhook: Namespace: pointer.String("kueue-system"), ManageJobsWithoutQueueName: false, InternalCertManagement: enableDefaultInternalCertManagement, + ClientConnection: defaultClientConnection, }, wantOptions: ctrl.Options{ Port: config.DefaultWebhookPort, @@ -299,6 +331,25 @@ webhook: WaitForPodsReady: &config.WaitForPodsReady{ Enable: true, }, + ClientConnection: defaultClientConnection, + }, + wantOptions: defaultControlOptions, + }, + { + name: "clientConnection config", + configFile: clientConnectionConfig, + wantConfiguration: config.Configuration{ + TypeMeta: metav1.TypeMeta{ + APIVersion: config.GroupVersion.String(), + Kind: "Configuration", + }, + Namespace: pointer.String(config.DefaultNamespace), + ManageJobsWithoutQueueName: false, + InternalCertManagement: enableDefaultInternalCertManagement, + ClientConnection: &config.ClientConnection{ + QPS: pointer.Float32(50), + Burst: pointer.Int32(100), + }, }, wantOptions: defaultControlOptions, },