Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Kubernetes POD deployment goroutine leak #5693

Merged
merged 14 commits into from
Jun 13, 2023
184 changes: 94 additions & 90 deletions pkg/corerp/handlers/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,28 +35,33 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
k8s "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
DefaultCacheResyncInterval = time.Minute * 10
DefaultDeploymentTimeout = time.Minute * 5
DefaultTestDeploymentTimeout = time.Second * 5
DefaultDeploymentTimeout = time.Minute * time.Duration(5)
DefaultCacheResyncInterval = time.Minute * time.Duration(10)
)

var TestHook bool

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added timeout duration to kubernetesHandler struct. So we can avoid the test specific code in the logic. (I do not like the old pattern)


// NewKubernetesHandler creates Kubernetes Resource handler instance.
func NewKubernetesHandler(client client.Client, clientSet k8s.Interface) ResourceHandler {
return &kubernetesHandler{client: client, clientSet: clientSet}
return &kubernetesHandler{
client: client,
clientSet: clientSet,
deploymentTimeOut: DefaultDeploymentTimeout,
cacheResyncInterval: DefaultCacheResyncInterval,
}
}

type kubernetesHandler struct {
client client.Client
clientSet k8s.Interface

deploymentTimeOut time.Duration
cacheResyncInterval time.Duration
}

func (handler *kubernetesHandler) Put(ctx context.Context, options *PutOptions) (map[string]string, error) {
Expand Down Expand Up @@ -100,121 +105,84 @@ func (handler *kubernetesHandler) Put(ctx context.Context, options *PutOptions)
},
}

if !strings.EqualFold(item.GetKind(), "Deployment") {
return properties, nil // only checking further the Deployment output resource status
// Monitor the created or updated resource until it is ready.
switch strings.ToLower(item.GetKind()) {
case "deployment":
// Monitor the deployment until it is ready.
err = handler.waitUntilDeploymentIsReady(ctx, &item)
if err != nil {
return nil, err
}
return properties, nil
default:
// We do not monitor the other resource types.
return properties, nil
}
}

timeout := DefaultDeploymentTimeout

// Setting the lower limits for testing when TestHook is enabled
if TestHook {
timeout = DefaultTestDeploymentTimeout
}
func (handler *kubernetesHandler) waitUntilDeploymentIsReady(ctx context.Context, item client.Object) error {
logger := ucplog.FromContextOrDiscard(ctx)

ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
doneCh := make(chan bool, 1)
errCh := make(chan error, 1)

logger := ucplog.FromContextOrDiscard(ctx)
ctx, cancel := context.WithTimeout(ctx, handler.deploymentTimeOut)

readinessCh := make(chan bool, 1)
watchErrorCh := make(chan error, 1)
go func() {
informerFactory := informers.NewSharedInformerFactoryWithOptions(handler.clientSet, DefaultCacheResyncInterval, informers.WithNamespace(item.GetNamespace()))
handler.WatchUntilReady(ctx, informerFactory, &item, readinessCh, watchErrorCh, nil)
}()
handler.startDeploymentInformer(ctx, item, doneCh, errCh)
// This ensures that the informer is stopped when this function is returned.
defer cancel()
Copy link

@youngbupark youngbupark Jun 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize that we do not need to call cancel explicitly when the status is matched because of this defer cancel. In any situations, informer is cancelled.


select {
case <-ctx.Done():
// Get the final deployment status
dep, err := handler.clientSet.AppsV1().Deployments(item.GetNamespace()).Get(ctx, item.GetName(), metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("deployment timed out, name: %s, namespace %s, error occured while fetching latest status: %w", item.GetName(), item.GetNamespace(), err)
return fmt.Errorf("deployment timed out, name: %s, namespace %s, error occured while fetching latest status: %w", item.GetName(), item.GetNamespace(), err)
}

// Now get the latest available observation of deployment current state
// note that there can be a race condition here, by the time it fetches the latest status, deployment might be succeeded
status := v1.DeploymentCondition{}
if len(dep.Status.Conditions) >= 1 {
if len(dep.Status.Conditions) > 0 {
status = dep.Status.Conditions[len(dep.Status.Conditions)-1]
}
return fmt.Errorf("deployment timed out, name: %s, namespace %s, status: %s, reason: %s", item.GetName(), item.GetNamespace(), status.Message, status.Reason)

return nil, fmt.Errorf("deployment timed out, name: %s, namespace %s, status: %s, reason: %s", item.GetName(), item.GetNamespace(), status.Message, status.Reason)
case <-readinessCh:
case <-doneCh:
logger.Info(fmt.Sprintf("Marking deployment %s in namespace %s as complete", item.GetName(), item.GetNamespace()))
return properties, nil
case <-watchErrorCh:
return nil, err
}
}
return nil

func (handler *kubernetesHandler) Delete(ctx context.Context, options *DeleteOptions) error {
identity := &resourcemodel.KubernetesIdentity{}
if err := store.DecodeMap(options.Resource.Identity.Data, identity); err != nil {
case err := <-errCh:
Copy link

@youngbupark youngbupark Jun 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol. we never tried to get error from the channel, which was bug.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😆

return err
}

item := unstructured.Unstructured{
Object: map[string]any{
"apiVersion": identity.APIVersion,
"kind": identity.Kind,
"metadata": map[string]any{
"namespace": identity.Namespace,
"name": identity.Name,
},
},
}

return client.IgnoreNotFound(handler.client.Delete(ctx, &item))
}

func convertToUnstructured(resource rpv1.OutputResource) (unstructured.Unstructured, error) {
if resource.ResourceType.Provider != resourcemodel.ProviderKubernetes {
return unstructured.Unstructured{}, errors.New("wrong resource type")
}

obj, ok := resource.Resource.(runtime.Object)
if !ok {
return unstructured.Unstructured{}, errors.New("inner type was not a runtime.Object")
}

c, err := runtime.DefaultUnstructuredConverter.ToUnstructured(resource.Resource)
if err != nil {
return unstructured.Unstructured{}, fmt.Errorf("could not convert object %v to unstructured: %w", obj.GetObjectKind(), err)
}

return unstructured.Unstructured{Object: c}, nil
}

func (handler *kubernetesHandler) WatchUntilReady(ctx context.Context, informerFactory informers.SharedInformerFactory, item client.Object, readinessCh chan<- bool, watchErrorCh chan<- error, eventHandlerInvokedCh chan struct{}) {
deploymentInformer := informerFactory.Apps().V1().Deployments().Informer()
func (handler *kubernetesHandler) startDeploymentInformer(ctx context.Context, item client.Object, doneCh chan<- bool, errCh chan<- error) {
informers := informers.NewSharedInformerFactoryWithOptions(handler.clientSet, handler.cacheResyncInterval, informers.WithNamespace(item.GetNamespace()))
deploymentInformer := informers.Apps().V1().Deployments().Informer()
handlers := cache.ResourceEventHandlerFuncs{
AddFunc: func(new_obj any) {
obj, ok := new_obj.(*v1.Deployment)
if !ok {
watchErrorCh <- errors.New("deployment object is not of appsv1.Deployment type")
errCh <- errors.New("deployment object is not of appsv1.Deployment type")
return
}

// There might be parallel deployments in progress, we need to make sure we are watching the right one
if obj.Name != item.GetName() {
return
}

// This channel is used only by test code to signal that the deployment is being watched

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was used for testing. We can test without this.

if eventHandlerInvokedCh != nil {
eventHandlerInvokedCh <- struct{}{}
}

handler.watchUntilDeploymentReady(ctx, obj, readinessCh)
handler.checkDeploymentStatus(ctx, obj, doneCh)
},
UpdateFunc: func(old_obj, new_obj any) {
old, ok := old_obj.(*v1.Deployment)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not know how to test this with fakeImformer. :(

if !ok {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the redundant check. This will never happen since it listens v1 deployment informer.

watchErrorCh <- errors.New("old deployment object is not of appsv1.Deployment type")
errCh <- errors.New("old deployment object is not of appsv1.Deployment type")
return
}
new, ok := new_obj.(*v1.Deployment)
if !ok {
watchErrorCh <- errors.New("new deployment object not of appsv1.Deployment type")
errCh <- errors.New("new deployment object is not of appsv1.Deployment type")
return
}

Expand All @@ -223,38 +191,74 @@ func (handler *kubernetesHandler) WatchUntilReady(ctx context.Context, informerF
return
}

// This channel is used only by test code to signal that the deployment is being watched
if eventHandlerInvokedCh != nil {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unnecessary code for testing.

eventHandlerInvokedCh <- struct{}{}
}

if old.ResourceVersion != new.ResourceVersion {
handler.watchUntilDeploymentReady(ctx, new, readinessCh)
handler.checkDeploymentStatus(ctx, new, doneCh)
}
},
DeleteFunc: func(obj any) {
// no-op here
},
}

deploymentInformer.AddEventHandler(handlers)

// Start the informer
informerFactory.Start(wait.NeverStop)
informerFactory.WaitForCacheSync(wait.NeverStop)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a good way since it waits all shared informers lol. we needs to wait with only infomers that we created here.

go func(stopCh <-chan struct{}) {
informers.Start(stopCh)
informers.WaitForCacheSync(stopCh)
}(ctx.Done())
youngbupark marked this conversation as resolved.
Show resolved Hide resolved
}

func (handler *kubernetesHandler) watchUntilDeploymentReady(ctx context.Context, obj *v1.Deployment, readinessCh chan<- bool) {
func (handler *kubernetesHandler) checkDeploymentStatus(ctx context.Context, obj *v1.Deployment, doneCh chan<- bool) {
logger := ucplog.FromContextOrDiscard(ctx)
for _, c := range obj.Status.Conditions {
// check for complete deployment condition
// Reference https://kubernetes.io/docs/concepts/workloads/controllers/deployment/#complete-deployment
if c.Type == v1.DeploymentProgressing && c.Status == corev1.ConditionTrue && strings.EqualFold(c.Reason, "NewReplicaSetAvailable") {
logger.Info(fmt.Sprintf("Deployment status for deployment: %s in namespace: %s is: %s - %s, Reason: %s", obj.Name, obj.Namespace, c.Type, c.Status, c.Reason))

// ObservedGeneration should be updated to latest generation to avoid stale replicas
if obj.Status.ObservedGeneration >= obj.Generation {
logger.Info(fmt.Sprintf("Deployment %s in namespace %s is ready. Observed generation: %d, Generation: %d", obj.Name, obj.Namespace, obj.Status.ObservedGeneration, obj.Generation))
readinessCh <- true
doneCh <- true
youngbupark marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}

func (handler *kubernetesHandler) Delete(ctx context.Context, options *DeleteOptions) error {
identity := &resourcemodel.KubernetesIdentity{}
if err := store.DecodeMap(options.Resource.Identity.Data, identity); err != nil {
return err
}

item := unstructured.Unstructured{
Object: map[string]any{
"apiVersion": identity.APIVersion,
"kind": identity.Kind,
"metadata": map[string]any{
"namespace": identity.Namespace,
"name": identity.Name,
},
},
}

return client.IgnoreNotFound(handler.client.Delete(ctx, &item))
}

func convertToUnstructured(resource rpv1.OutputResource) (unstructured.Unstructured, error) {
if resource.ResourceType.Provider != resourcemodel.ProviderKubernetes {
return unstructured.Unstructured{}, errors.New("wrong resource type")
}

obj, ok := resource.Resource.(runtime.Object)
if !ok {
return unstructured.Unstructured{}, errors.New("inner type was not a runtime.Object")
}

c, err := runtime.DefaultUnstructuredConverter.ToUnstructured(resource.Resource)
if err != nil {
return unstructured.Unstructured{}, fmt.Errorf("could not convert object %v to unstructured: %w", obj.GetObjectKind(), err)
}

return unstructured.Unstructured{Object: c}, nil
}
Loading