Skip to content

Introduce flags for fine-tuning maximum concurrent reconciles per resource #141

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 102 additions & 52 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,24 +39,26 @@ import (
)

const (
flagEnableLeaderElection = "enable-leader-election"
flagLeaderElectionNamespace = "leader-election-namespace"
flagMetricAddr = "metrics-addr"
flagHealthzAddr = "healthz-addr"
flagEnableDevLogging = "enable-development-logging"
flagAWSRegion = "aws-region"
flagAWSEndpointURL = "aws-endpoint-url"
flagAWSIdentityEndpointURL = "aws-identity-endpoint-url"
flagUnsafeAWSEndpointURLs = "allow-unsafe-aws-endpoint-urls"
flagLogLevel = "log-level"
flagResourceTags = "resource-tags"
flagWatchNamespace = "watch-namespace"
flagEnableWebhookServer = "enable-webhook-server"
flagWebhookServerAddr = "webhook-server-addr"
flagDeletionPolicy = "deletion-policy"
flagReconcileDefaultResyncSeconds = "reconcile-default-resync-seconds"
flagReconcileResourceResyncSeconds = "reconcile-resource-resync-seconds"
envVarAWSRegion = "AWS_REGION"
flagEnableLeaderElection = "enable-leader-election"
flagLeaderElectionNamespace = "leader-election-namespace"
flagMetricAddr = "metrics-addr"
flagHealthzAddr = "healthz-addr"
flagEnableDevLogging = "enable-development-logging"
flagAWSRegion = "aws-region"
flagAWSEndpointURL = "aws-endpoint-url"
flagAWSIdentityEndpointURL = "aws-identity-endpoint-url"
flagUnsafeAWSEndpointURLs = "allow-unsafe-aws-endpoint-urls"
flagLogLevel = "log-level"
flagResourceTags = "resource-tags"
flagWatchNamespace = "watch-namespace"
flagEnableWebhookServer = "enable-webhook-server"
flagWebhookServerAddr = "webhook-server-addr"
flagDeletionPolicy = "deletion-policy"
flagReconcileDefaultResyncSeconds = "reconcile-default-resync-seconds"
flagReconcileResourceResyncSeconds = "reconcile-resource-resync-seconds"
flagReconcileDefaultMaxConcurrency = "reconcile-default-max-concurrent-syncs"
flagReconcileResourceMaxConcurrency = "reconcile-resource-max-concurrent-syncs"
envVarAWSRegion = "AWS_REGION"
)

var (
Expand All @@ -74,24 +76,26 @@ var (

// Config contains configuration options for ACK service controllers
type Config struct {
MetricsAddr string
HealthzAddr string
EnableLeaderElection bool
LeaderElectionNamespace string
EnableDevelopmentLogging bool
AccountID string
Region string
IdentityEndpointURL string
EndpointURL string
AllowUnsafeEndpointURL bool
LogLevel string
ResourceTags []string
WatchNamespace string
EnableWebhookServer bool
WebhookServerAddr string
DeletionPolicy ackv1alpha1.DeletionPolicy
ReconcileDefaultResyncSeconds int
ReconcileResourceResyncSeconds []string
MetricsAddr string
HealthzAddr string
EnableLeaderElection bool
LeaderElectionNamespace string
EnableDevelopmentLogging bool
AccountID string
Region string
IdentityEndpointURL string
EndpointURL string
AllowUnsafeEndpointURL bool
LogLevel string
ResourceTags []string
WatchNamespace string
EnableWebhookServer bool
WebhookServerAddr string
DeletionPolicy ackv1alpha1.DeletionPolicy
ReconcileDefaultResyncSeconds int
ReconcileResourceResyncSeconds []string
ReconcileDefaultMaxConcurrency int
ReconcileResourceMaxConcurrency []string
}

// BindFlags defines CLI/runtime configuration options
Expand Down Expand Up @@ -202,6 +206,19 @@ func (cfg *Config) BindFlags() {
" configuration maps resource kinds to drift remediation periods in seconds. If provided, "+
" resource-specific resync periods take precedence over the default period.",
)
flag.IntVar(
&cfg.ReconcileDefaultMaxConcurrency, flagReconcileDefaultMaxConcurrency,
1,
"The default maximum number of concurrent reconciles for a resource reconciler. This value is used if no "+
"resource-specific override has been specified. Default is 1.",
)
Comment on lines +209 to +214
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will maintain the default maximum reconciles at 1

flag.StringArrayVar(
&cfg.ReconcileResourceMaxConcurrency, flagReconcileResourceMaxConcurrency,
[]string{},
"A Key/Value list of strings representing the reconcile max concurrency configuration for each resource. This"+
" configuration maps resource kinds to maximum number of concurrent reconciles. If provided, "+
" resource-specific max concurrency takes precedence over the default max concurrency.",
)
}

// SetupLogger initializes the logger used in the service controller
Expand All @@ -222,7 +239,6 @@ func (cfg *Config) SetupLogger() {
// SetAWSAccountID uses sts GetCallerIdentity API to find AWS AccountId and set
// in Config
func (cfg *Config) SetAWSAccountID() error {

awsCfg := aws.Config{}
if cfg.IdentityEndpointURL != "" {
awsCfg.Endpoint = aws.String(cfg.IdentityEndpointURL)
Expand Down Expand Up @@ -297,6 +313,9 @@ func (cfg *Config) Validate(options ...Option) error {
if cfg.ReconcileDefaultResyncSeconds < 0 {
return fmt.Errorf("invalid value for flag '%s': resync seconds default must be greater than 0", flagReconcileDefaultResyncSeconds)
}
if cfg.ReconcileDefaultMaxConcurrency < 1 {
return fmt.Errorf("invalid value for flag '%s': max concurrency default must be greater than 0", flagReconcileDefaultMaxConcurrency)
}
return nil
}

Expand All @@ -309,28 +328,45 @@ func (cfg *Config) checkUnsafeEndpoint(endpoint *url.URL) error {
return nil
}

// validateReconcileConfigResources validates the --reconcile-resource-resync-seconds flag
// by checking the resource names and their corresponding duration.
// validateReconcileConfigResources validates the --reconcile-resource-resync-seconds and
// --reconcile-resource-max-concurrent-syncs flags. It ensures that the resource names provided
// in the flags are valid and managed by the controller.
func (cfg *Config) validateReconcileConfigResources(supportedGVKs []schema.GroupVersionKind) error {
validResourceNames := []string{}
for _, gvk := range supportedGVKs {
validResourceNames = append(validResourceNames, gvk.Kind)
}
for _, resourceResyncSecondsFlag := range cfg.ReconcileResourceResyncSeconds {
resourceName, _, err := parseReconcileFlagArgument(resourceResyncSecondsFlag)
if err != nil {
return fmt.Errorf("error parsing flag argument '%v': %v. Expected format: resource=seconds", resourceResyncSecondsFlag, err)
for _, resourceFlagArgument := range cfg.ReconcileResourceResyncSeconds {
if err := validateReconcileConfigResource(validResourceNames, resourceFlagArgument); err != nil {
return fmt.Errorf("invalid value for flag '%s': %v", flagReconcileResourceResyncSeconds, err)
}
if !ackutil.InStrings(resourceName, validResourceNames) {
return fmt.Errorf(
"error parsing flag argument '%v': resource '%v' is not managed by this controller. Expected one of %v",
resourceResyncSecondsFlag, resourceName, strings.Join(validResourceNames, ", "),
)
}
for _, resourceFlagArgument := range cfg.ReconcileResourceMaxConcurrency {
if err := validateReconcileConfigResource(validResourceNames, resourceFlagArgument); err != nil {
return fmt.Errorf("invalid value for flag '%s': %v", flagReconcileResourceMaxConcurrency, err)
}
}
return nil
}

// validateReconcileConfigResource validates a single flag argument of any flag that is used to configure
// resource-specific reconcile settings. It ensures that the resource name is valid and managed by the
// controller, and that the value is a positive integer. If the flag argument is not in the expected format
// or has invalid elements, an error is returned.
func validateReconcileConfigResource(validResourceNames []string, resourceFlagArgument string) error {
resourceName, _, err := parseReconcileFlagArgument(resourceFlagArgument)
if err != nil {
return fmt.Errorf("error parsing flag argument '%v': %v. Expected format: string=number", resourceFlagArgument, err)
}
if !ackutil.InStrings(resourceName, validResourceNames) {
return fmt.Errorf(
"error parsing flag argument '%v': resource '%v' is not managed by this controller. Expected one of %v",
resourceFlagArgument, resourceName, strings.Join(validResourceNames, ", "),
)
}
return nil
}

// ParseReconcileResourceResyncSeconds parses the values of the --reconcile-resource-resync-seconds
// flag and returns a map that maps resource names to resync periods.
// The flag arguments are expected to have the format "resource=seconds", where "resource" is the
Expand All @@ -346,6 +382,20 @@ func (cfg *Config) ParseReconcileResourceResyncSeconds() (map[string]time.Durati
return resourceResyncPeriods, nil
}

// GetReconcileResourceMaxConcurrency returns the maximum number of concurrent reconciles for a
// given resource name. If the resource name is not found in the --reconcile-resource-max-concurrent-syncs
// flag, the function returns the default maximum concurrency value.
func (cfg *Config) GetReconcileResourceMaxConcurrency(resourceName string) int {
for _, resourceMaxConcurrencyFlag := range cfg.ReconcileResourceMaxConcurrency {
// Parse the resource name and max concurrency from the flag argument
name, maxConcurrency, _ := parseReconcileFlagArgument(resourceMaxConcurrencyFlag)
if strings.EqualFold(name, resourceName) {
return maxConcurrency
}
}
return cfg.ReconcileDefaultMaxConcurrency
}

// parseReconcileFlagArgument parses a flag argument of the form "key=value" into
// its individual elements. The key must be a non-empty string and the value must be
// a non-empty positive integer. If the flag argument is not in the expected format
Expand All @@ -365,14 +415,14 @@ func parseReconcileFlagArgument(flagArgument string) (string, int, error) {
return "", 0, fmt.Errorf("missing value in flag argument")
}

resyncSeconds, err := strconv.Atoi(elements[1])
value, err := strconv.Atoi(elements[1])
if err != nil {
return "", 0, fmt.Errorf("invalid value in flag argument: %v", err)
}
if resyncSeconds < 0 {
return "", 0, fmt.Errorf("invalid value in flag argument: expected non-negative integer, got %d", resyncSeconds)
if value <= 0 {
return "", 0, fmt.Errorf("invalid value in flag argument: value must be greater than 0")
}
return elements[0], resyncSeconds, nil
return elements[0], value, nil
}

// GetWatchNamespaces returns a slice of namespaces to watch for custom resource events.
Expand Down
4 changes: 2 additions & 2 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func TestParseReconcileFlagArgument(t *testing.T) {
{"=value", "", 0, true, "missing key in flag argument"},
{"key=value1=value2", "", 0, true, "invalid flag argument format: expected key=value"},
{"key=a", "", 0, true, "invalid value in flag argument: strconv.Atoi: parsing \"a\": invalid syntax"},
{"key=-1", "", 0, true, "invalid value in flag argument: expected non-negative integer, got -1"},
{"key=-123456", "", 0, true, "invalid value in flag argument: expected non-negative integer, got -123456"},
{"key=-1", "", 0, true, "invalid value in flag argument: value must be greater than 0"},
{"key=-123456", "", 0, true, "invalid value in flag argument: value must be greater than 0"},
{"key=1.1", "", 0, true, "invalid value in flag argument: strconv.Atoi: parsing \"1.1\": invalid syntax"},
}
for _, test := range tests {
Expand Down
6 changes: 6 additions & 0 deletions pkg/runtime/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
ctrlrt "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
ctrlrtcontroller "sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/predicate"

ackv1alpha1 "github.com/aws-controllers-k8s/runtime/apis/core/v1alpha1"
Expand Down Expand Up @@ -99,12 +100,17 @@ func (r *resourceReconciler) BindControllerManager(mgr ctrlrt.Manager) error {
r.kc = mgr.GetClient()
r.apiReader = mgr.GetAPIReader()
rd := r.rmf.ResourceDescriptor()
maxConcurrentReconciles := r.cfg.GetReconcileResourceMaxConcurrency(rd.GroupVersionKind().Kind)
return ctrlrt.NewControllerManagedBy(
mgr,
).For(
rd.EmptyRuntimeObject(),
).WithEventFilter(
predicate.GenerationChangedPredicate{},
).WithOptions(
ctrlrtcontroller.Options{
MaxConcurrentReconciles: maxConcurrentReconciles,
},
).Complete(r)
}

Expand Down