Skip to content

OTA-1531: [6/6] cvo: use early gate checker to avoid delayed initialization #1196

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 29 additions & 9 deletions pkg/cvo/cvo.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,19 @@ type Operator struct {
// via annotation
exclude string

// requiredFeatureSet is the feature set that was detected in the cluster when CVO was started and is used
// to select the manifests that will be applied in the cluster. The starting value cannot be changed in the executing
// CVO but the featurechangestopper controller will detect a feature set change in the cluster and shutdown the CVO.
// Enforcing featuresets is a standard GA CVO behavior that supports the feature gating functionality across the whole
// cluster, as opposed to the enabledFeatureGates which controls what gated behaviors of CVO itself are enabled by
// the cluster feature gates.
// See: https://github.com/openshift/enhancements/blob/master/enhancements/update/cvo-techpreview-manifests.md
requiredFeatureSet configv1.FeatureSet

// enabledFeatureGates is the checker for what gated CVO behaviors are enabled or disabled by specific cluster-level
// feature gates. It allows multiplexing the cluster-level feature gates to more granular CVO-level gates. Similarly
// to the requiredFeatureSet, the enabledFeatureGates cannot be changed in the executing CVO but the
// featurechangestopper controller will detect when cluster feature gate config changes and shutdown the CVO.
enabledFeatureGates featuregates.CvoGateChecker

clusterProfile string
Expand Down Expand Up @@ -210,6 +223,8 @@ func New(
injectClusterIdIntoPromQL bool,
updateService string,
alwaysEnableCapabilities []configv1.ClusterVersionCapability,
featureSet configv1.FeatureSet,
cvoGates featuregates.CvoGateChecker,
) (*Operator, error) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
Expand Down Expand Up @@ -243,10 +258,9 @@ func New(
conditionRegistry: standard.NewConditionRegistry(promqlTarget),
injectClusterIdIntoPromQL: injectClusterIdIntoPromQL,

// Because of OCPBUGS-30080, we can only detect the enabled feature gates after Operator loads the initial payload
// from disk via LoadInitialPayload. We must not have any gate-checking code until that happens, so we initialize
// this field with a checker that panics when used.
enabledFeatureGates: featuregates.PanicOnUsageBeforeInitialization,
requiredFeatureSet: featureSet,
enabledFeatureGates: cvoGates,

alwaysEnableCapabilities: alwaysEnableCapabilities,
}

Expand Down Expand Up @@ -283,7 +297,7 @@ func New(

// LoadInitialPayload waits until a ClusterVersion object exists. It then retrieves the payload contents, verifies the
// initial state and returns it. If the payload is invalid, an error is returned.
func (optr *Operator) LoadInitialPayload(ctx context.Context, startingRequiredFeatureSet configv1.FeatureSet, restConfig *rest.Config) (*payload.Update, error) {
func (optr *Operator) LoadInitialPayload(ctx context.Context, restConfig *rest.Config) (*payload.Update, error) {

// wait until cluster version object exists
if err := wait.PollUntilContextCancel(ctx, 3*time.Second, true, func(ctx context.Context) (bool, error) {
Expand All @@ -303,7 +317,7 @@ func (optr *Operator) LoadInitialPayload(ctx context.Context, startingRequiredFe
return nil, fmt.Errorf("Error when attempting to get cluster version object: %w", err)
}

update, err := payload.LoadUpdate(optr.defaultPayloadDir(), optr.release.Image, optr.exclude, string(startingRequiredFeatureSet),
update, err := payload.LoadUpdate(optr.defaultPayloadDir(), optr.release.Image, optr.exclude, string(optr.requiredFeatureSet),
optr.clusterProfile, configv1.KnownClusterVersionCapabilities)

if err != nil {
Expand Down Expand Up @@ -339,8 +353,12 @@ func (optr *Operator) LoadInitialPayload(ctx context.Context, startingRequiredFe

// InitializeFromPayload configures the controller that loads and applies content to the cluster given an initial payload
// and feature gate data.
func (optr *Operator) InitializeFromPayload(update *payload.Update, requiredFeatureSet configv1.FeatureSet, cvoFlags featuregates.CvoGateChecker, restConfig *rest.Config, burstRestConfig *rest.Config) {
optr.enabledFeatureGates = cvoFlags
func (optr *Operator) InitializeFromPayload(ctx context.Context, restConfig *rest.Config, burstRestConfig *rest.Config) error {
update, err := optr.LoadInitialPayload(ctx, restConfig)
if err != nil {
return err
}

optr.release = update.Release
optr.releaseCreated = update.ImageRef.CreationTimestamp.Time

Expand All @@ -358,11 +376,13 @@ func (optr *Operator) InitializeFromPayload(update *payload.Update, requiredFeat
Cap: time.Second * 15,
},
optr.exclude,
requiredFeatureSet,
optr.requiredFeatureSet,
optr.eventRecorder,
optr.clusterProfile,
optr.alwaysEnableCapabilities,
)

return nil
}

// ownerReferenceModifier sets the owner reference to the current CV resource if no other reference exists. It also resets
Expand Down
11 changes: 5 additions & 6 deletions pkg/featuregates/featurechangestopper.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@ type ChangeStopper struct {
}

// NewChangeStopper returns a new ChangeStopper.
func NewChangeStopper(featureGateInformer configinformersv1.FeatureGateInformer) (*ChangeStopper, error) {
func NewChangeStopper(featureGateInformer configinformersv1.FeatureGateInformer, requiredFeatureSet configv1.FeatureSet, cvoGates CvoGates) (*ChangeStopper, error) {
c := &ChangeStopper{
featureGateLister: featureGateInformer.Lister(),
cacheSynced: []cache.InformerSynced{featureGateInformer.Informer().HasSynced},
queue: workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{Name: "feature-gate-stopper"}),

startingRequiredFeatureSet: &requiredFeatureSet,
startingCvoGates: &cvoGates,
}

c.queue.Add("cluster") // seed an initial sync, in case startingRequiredFeatureSet is wrong
if _, err := featureGateInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(_ interface{}) {
Expand All @@ -54,11 +58,6 @@ func NewChangeStopper(featureGateInformer configinformersv1.FeatureGateInformer)
return c, nil
}

func (c *ChangeStopper) SetStartingFeatures(requiredFeatureSet configv1.FeatureSet, cvoGates CvoGates) {
c.startingRequiredFeatureSet = &requiredFeatureSet
c.startingCvoGates = &cvoGates
}

// syncHandler processes a single work entry, with the
// processNextWorkItem caller handling the queue management. It returns
// done when there will be no more work (because the feature gate changed).
Expand Down
3 changes: 1 addition & 2 deletions pkg/featuregates/featurechangestopper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,10 @@ func TestTechPreviewChangeStopper(t *testing.T) {
client := fakeconfigv1client.NewSimpleClientset(fg)

informerFactory := configv1informer.NewSharedInformerFactory(client, 0)
c, err := NewChangeStopper(informerFactory.Config().V1().FeatureGates())
c, err := NewChangeStopper(informerFactory.Config().V1().FeatureGates(), tt.startingRequiredFeatureSet, tt.startingCvoFeatureGates)
if err != nil {
t.Fatal(err)
}
c.SetStartingFeatures(tt.startingRequiredFeatureSet, tt.startingCvoFeatureGates)
informerFactory.Start(ctx.Done())

if err := c.Run(ctx, shutdownFn); err != nil {
Expand Down
36 changes: 5 additions & 31 deletions pkg/featuregates/featuregates.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ import (
// StubOpenShiftVersion is the default OpenShift version placeholder for the purpose of determining
// enabled and disabled CVO feature gates. It is assumed to never conflict with a real OpenShift
// version. Both DefaultCvoGates and CvoGatesFromFeatureGate should return a safe conservative
// default set of enabled and disabled gates, typically with unknownVersion set to true.
// default set of enabled and disabled gates when StubOpenShiftVersion is passed as the version.
// This value is an analogue to the "0.0.1-snapshot" string that is used as a placeholder in
// second-level operator manifests. Here we use the same string for consistency, but the constant
// should be only used by the callers of CvoGatesFromFeatureGate and DefaultCvoGates methods when
// the caller does not know its actual OpenShift version.
const StubOpenShiftVersion = "0.0.1-snapshot"

// CvoGateChecker allows CVO code to check which feature gates are enabled
Expand Down Expand Up @@ -41,36 +45,6 @@ type CvoGateChecker interface {
CVOConfiguration() bool
}

type panicOnUsageBeforeInitializationFunc func()

func panicOnUsageBeforeInitialization() {
panic("CVO feature flags were used before they were initialized")
}

// PanicOnUsageBeforeInitialization is a CvoGateChecker that panics if any of its methods are called. This checker should
// be used before CVO feature gates are actually known and some code tries to check them.
var PanicOnUsageBeforeInitialization = panicOnUsageBeforeInitializationFunc(panicOnUsageBeforeInitialization)

func (p panicOnUsageBeforeInitializationFunc) ReconciliationIssuesCondition() bool {
p()
return false
}

func (p panicOnUsageBeforeInitializationFunc) StatusReleaseArchitecture() bool {
p()
return false
}

func (p panicOnUsageBeforeInitializationFunc) UnknownVersion() bool {
p()
return false
}

func (p panicOnUsageBeforeInitializationFunc) CVOConfiguration() bool {
p()
return false
}

// CvoGates contains flags that control CVO functionality gated by product feature gates. The
// names do not correspond to product feature gates, the booleans here are "smaller" (product-level
// gate will enable multiple CVO behaviors).
Expand Down
99 changes: 19 additions & 80 deletions pkg/start/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
configv1 "github.com/openshift/api/config/v1"
clientset "github.com/openshift/client-go/config/clientset/versioned"
configinformers "github.com/openshift/client-go/config/informers/externalversions"
configlistersv1 "github.com/openshift/client-go/config/listers/config/v1"
operatorclientset "github.com/openshift/client-go/operator/clientset/versioned"
operatorinformers "github.com/openshift/client-go/operator/informers/externalversions"
"github.com/openshift/library-go/pkg/config/clusterstatus"
Expand Down Expand Up @@ -190,13 +189,13 @@ func (o *Options) Run(ctx context.Context) error {
}

clusterVersionConfigInformerFactory, configInformerFactory := o.prepareConfigInformerFactories(cb)
_, _, err = o.processInitialFeatureGate(ctx, configInformerFactory)
startingFeatureSet, startingCvoGates, err := o.processInitialFeatureGate(ctx, configInformerFactory)
if err != nil {
return fmt.Errorf("error processing feature gates: %w", err)
}

// initialize the controllers and attempt to load the payload information
controllerCtx, err := o.NewControllerContext(cb, clusterVersionConfigInformerFactory, configInformerFactory)
controllerCtx, err := o.NewControllerContext(cb, startingFeatureSet, startingCvoGates, clusterVersionConfigInformerFactory, configInformerFactory)
if err != nil {
return err
}
Expand Down Expand Up @@ -244,7 +243,7 @@ func (o *Options) getOpenShiftVersion() string {
return releaseMetadata.Version
}

func (o *Options) processInitialFeatureGate(ctx context.Context, configInformerFactory configinformers.SharedInformerFactory) (configv1.FeatureSet, *featuregates.CvoGates, error) {
func (o *Options) processInitialFeatureGate(ctx context.Context, configInformerFactory configinformers.SharedInformerFactory) (configv1.FeatureSet, featuregates.CvoGates, error) {
featureGates := configInformerFactory.Config().V1().FeatureGates().Lister()
configInformerFactory.Start(ctx.Done())
configInformerFactory.WaitForCacheSync(ctx.Done())
Expand All @@ -256,8 +255,8 @@ func (o *Options) processInitialFeatureGate(ctx context.Context, configInformerF
var clusterFeatureGate *configv1.FeatureGate

// client-go automatically retries some network blip errors on GETs for 30s by default, and we want to
// retry the remaining ones ourselves. If we fail longer than that, the operator won't be able to do work
// anyway. Return the error and crashloop.
// retry the remaining ones ourselves. If we fail longer than that, CVO won't be able to do work anyway.
// Return the error and crashloop.
//
// We implement the timeout with a context because the timeout in PollImmediateWithContext does not behave
// well when ConditionFunc takes longer time to execute, like here where the GET can be retried by client-go
Expand All @@ -284,17 +283,17 @@ func (o *Options) processInitialFeatureGate(ctx context.Context, configInformerF
}
}); err != nil {
if lastError != nil {
return "", nil, lastError
return "", cvoGates, lastError
}
return "", nil, err
return "", cvoGates, err
}

if cvoGates.UnknownVersion() {
klog.Warningf("CVO features for version %s could not be detected from FeatureGate; will use defaults plus special UnknownVersion feature gate", cvoOpenShiftVersion)
}
klog.Infof("CVO features for version %s enabled at startup: %+v", cvoOpenShiftVersion, cvoGates)

return startingFeatureSet, &cvoGates, nil
return startingFeatureSet, cvoGates, nil
}

// run launches a number of goroutines to handle manifest application,
Expand Down Expand Up @@ -362,7 +361,7 @@ func (o *Options) run(ctx context.Context, controllerCtx *Context, lock resource
resultChannel <- asyncResult{name: "metrics server", error: err}
}()
}
if err := controllerCtx.InitializeFromPayload(runContext, restConfig, burstRestConfig); err != nil {
if err := controllerCtx.CVO.InitializeFromPayload(runContext, restConfig, burstRestConfig); err != nil {
if firstError == nil {
firstError = err
}
Expand Down Expand Up @@ -577,13 +576,17 @@ type Context struct {
// OperatorInformerFactory should be used to get informers / listers for code that works with resources from the
// operator.openshift.io group
OperatorInformerFactory operatorinformers.SharedInformerFactory

fgLister configlistersv1.FeatureGateLister
}

// NewControllerContext initializes the default Context for the current Options. It does
// not start any background processes.
func (o *Options) NewControllerContext(cb *ClientBuilder, clusterVersionConfigInformerFactory, configInformerFactory configinformers.SharedInformerFactory) (*Context, error) {
func (o *Options) NewControllerContext(
cb *ClientBuilder,
startingFeatureSet configv1.FeatureSet,
startingCvoGates featuregates.CvoGates,
clusterVersionConfigInformerFactory,
configInformerFactory configinformers.SharedInformerFactory,
) (*Context, error) {
kubeClient := cb.KubeClientOrDie(internal.ConfigNamespace, useProtobuf)
openshiftConfigInformerFactory := coreinformers.NewSharedInformerFactoryWithOptions(kubeClient, resyncPeriod(o.ResyncInterval), coreinformers.WithNamespace(internal.ConfigNamespace))
openshiftConfigManagedInformerFactory := coreinformers.NewSharedInformerFactoryWithOptions(kubeClient, resyncPeriod(o.ResyncInterval), coreinformers.WithNamespace(internal.ConfigManagedNamespace))
Expand Down Expand Up @@ -621,12 +624,14 @@ func (o *Options) NewControllerContext(cb *ClientBuilder, clusterVersionConfigIn
o.InjectClusterIdIntoPromQL,
o.UpdateService,
stringsToCapabilities(o.AlwaysEnableCapabilities),
startingFeatureSet,
startingCvoGates,
)
if err != nil {
return nil, err
}

featureChangeStopper, err := featuregates.NewChangeStopper(configInformerFactory.Config().V1().FeatureGates())
featureChangeStopper, err := featuregates.NewChangeStopper(configInformerFactory.Config().V1().FeatureGates(), startingFeatureSet, startingCvoGates)
if err != nil {
return nil, err
}
Expand All @@ -639,8 +644,6 @@ func (o *Options) NewControllerContext(cb *ClientBuilder, clusterVersionConfigIn
OperatorInformerFactory: operatorInformerFactory,
CVO: cvo,
StopOnFeatureGateChange: featureChangeStopper,

fgLister: configInformerFactory.Config().V1().FeatureGates().Lister(),
}

if o.EnableAutoUpdate {
Expand All @@ -663,70 +666,6 @@ func (o *Options) NewControllerContext(cb *ClientBuilder, clusterVersionConfigIn
return ctx, nil
}

// InitializeFromPayload initializes the CVO and FeatureGate ChangeStoppers controllers from the payload. It extracts the
// current CVO version from the initial payload and uses it to determine the initial the required featureset and enabled
// feature gates. Both the payload and determined feature information are used to initialize CVO and feature gate
// ChangeStopper controllers.
func (c *Context) InitializeFromPayload(ctx context.Context, restConfig *rest.Config, burstRestConfig *rest.Config) error {
var startingFeatureSet configv1.FeatureSet
var clusterFeatureGate *configv1.FeatureGate

// client-go automatically retries some network blip errors on GETs for 30s by default, and we want to
// retry the remaining ones ourselves. If we fail longer than that, the operator won't be able to do work
// anyway. Return the error and crashloop.
//
// We implement the timeout with a context because the timeout in PollImmediateWithContext does not behave
// well when ConditionFunc takes longer time to execute, like here where the GET can be retried by client-go
var lastError error
if err := wait.PollUntilContextTimeout(context.Background(), 2*time.Second, 25*time.Second, true, func(ctx context.Context) (bool, error) {
gate, fgErr := c.fgLister.Get("cluster")
switch {
case apierrors.IsNotFound(fgErr):
// if we have no featuregates, then the cluster is using the default featureset, which is "".
// This excludes everything that could possibly depend on a different feature set.
startingFeatureSet = ""
klog.Infof("FeatureGate not found in cluster, using default feature set %q at startup", startingFeatureSet)
return true, nil
case fgErr != nil:
lastError = fgErr
klog.Warningf("Failed to get FeatureGate from cluster: %v", fgErr)
return false, nil
default:
clusterFeatureGate = gate
startingFeatureSet = gate.Spec.FeatureSet
klog.Infof("FeatureGate found in cluster, using its feature set %q at startup", startingFeatureSet)
return true, nil
}
}); err != nil {
if lastError != nil {
return lastError
}
return err
}

payload, err := c.CVO.LoadInitialPayload(ctx, startingFeatureSet, restConfig)
if err != nil {
return err
}

var cvoGates featuregates.CvoGates
if clusterFeatureGate != nil {
cvoGates = featuregates.CvoGatesFromFeatureGate(clusterFeatureGate, payload.Release.Version)
} else {
cvoGates = featuregates.DefaultCvoGates(payload.Release.Version)
}

if cvoGates.UnknownVersion() {
klog.Infof("CVO features for version %s could not be detected from FeatureGate; will use defaults plus special UnknownVersion feature gate", payload.Release.Version)
}
klog.Infof("CVO features for version %s enabled at startup: %+v", payload.Release.Version, cvoGates)

c.StopOnFeatureGateChange.SetStartingFeatures(startingFeatureSet, cvoGates)
c.CVO.InitializeFromPayload(payload, startingFeatureSet, cvoGates, restConfig, burstRestConfig)

return nil
}

func stringsToCapabilities(names []string) []configv1.ClusterVersionCapability {
caps := make([]configv1.ClusterVersionCapability, len(names))
for i, c := range names {
Expand Down
Loading