-
Notifications
You must be signed in to change notification settings - Fork 97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Kubernetes POD deployment goroutine leak #5693
Changes from 5 commits
0b5e08f
fc7495d
3dd8411
df5fe64
9213257
99ab473
a577d91
169aeb6
118ad2d
b2a8ad0
ea3de58
c6f7d41
1342ba6
492d5de
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,28 +35,33 @@ import ( | |
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" | ||
"k8s.io/apimachinery/pkg/runtime" | ||
"k8s.io/apimachinery/pkg/util/wait" | ||
"k8s.io/client-go/informers" | ||
k8s "k8s.io/client-go/kubernetes" | ||
"k8s.io/client-go/tools/cache" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
) | ||
|
||
const ( | ||
DefaultCacheResyncInterval = time.Minute * 10 | ||
DefaultDeploymentTimeout = time.Minute * 5 | ||
DefaultTestDeploymentTimeout = time.Second * 5 | ||
DefaultDeploymentTimeout = time.Minute * time.Duration(5) | ||
DefaultCacheResyncInterval = time.Minute * time.Duration(10) | ||
) | ||
|
||
var TestHook bool | ||
|
||
// NewKubernetesHandler creates Kubernetes Resource handler instance. | ||
func NewKubernetesHandler(client client.Client, clientSet k8s.Interface) ResourceHandler { | ||
return &kubernetesHandler{client: client, clientSet: clientSet} | ||
return &kubernetesHandler{ | ||
client: client, | ||
clientSet: clientSet, | ||
deploymentTimeOut: DefaultDeploymentTimeout, | ||
cacheResyncInterval: DefaultCacheResyncInterval, | ||
} | ||
} | ||
|
||
type kubernetesHandler struct { | ||
client client.Client | ||
clientSet k8s.Interface | ||
|
||
deploymentTimeOut time.Duration | ||
cacheResyncInterval time.Duration | ||
} | ||
|
||
func (handler *kubernetesHandler) Put(ctx context.Context, options *PutOptions) (map[string]string, error) { | ||
|
@@ -100,121 +105,84 @@ func (handler *kubernetesHandler) Put(ctx context.Context, options *PutOptions) | |
}, | ||
} | ||
|
||
if !strings.EqualFold(item.GetKind(), "Deployment") { | ||
return properties, nil // only checking further the Deployment output resource status | ||
// Monitor the created or updated resource until it is ready. | ||
switch strings.ToLower(item.GetKind()) { | ||
case "deployment": | ||
// Monitor the deployment until it is ready. | ||
err = handler.waitUntilDeploymentIsReady(ctx, &item) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return properties, nil | ||
default: | ||
// We do not monitor the other resource types. | ||
return properties, nil | ||
} | ||
} | ||
|
||
timeout := DefaultDeploymentTimeout | ||
|
||
// Setting the lower limits for testing when TestHook is enabled | ||
if TestHook { | ||
timeout = DefaultTestDeploymentTimeout | ||
} | ||
func (handler *kubernetesHandler) waitUntilDeploymentIsReady(ctx context.Context, item client.Object) error { | ||
logger := ucplog.FromContextOrDiscard(ctx) | ||
|
||
ctx, cancel := context.WithTimeout(ctx, timeout) | ||
defer cancel() | ||
doneCh := make(chan bool, 1) | ||
errCh := make(chan error, 1) | ||
|
||
logger := ucplog.FromContextOrDiscard(ctx) | ||
ctx, cancel := context.WithTimeout(ctx, handler.deploymentTimeOut) | ||
|
||
readinessCh := make(chan bool, 1) | ||
watchErrorCh := make(chan error, 1) | ||
go func() { | ||
informerFactory := informers.NewSharedInformerFactoryWithOptions(handler.clientSet, DefaultCacheResyncInterval, informers.WithNamespace(item.GetNamespace())) | ||
handler.WatchUntilReady(ctx, informerFactory, &item, readinessCh, watchErrorCh, nil) | ||
}() | ||
handler.startDeploymentInformer(ctx, item, doneCh, errCh) | ||
// This ensures that the informer is stopped when this function is returned. | ||
defer cancel() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I realize that we do not need to call cancel explicitly when the status is matched because of this defer cancel. In any situations, informer is cancelled. |
||
|
||
select { | ||
case <-ctx.Done(): | ||
// Get the final deployment status | ||
dep, err := handler.clientSet.AppsV1().Deployments(item.GetNamespace()).Get(ctx, item.GetName(), metav1.GetOptions{}) | ||
if err != nil { | ||
return nil, fmt.Errorf("deployment timed out, name: %s, namespace %s, error occured while fetching latest status: %w", item.GetName(), item.GetNamespace(), err) | ||
return fmt.Errorf("deployment timed out, name: %s, namespace %s, error occured while fetching latest status: %w", item.GetName(), item.GetNamespace(), err) | ||
} | ||
|
||
// Now get the latest available observation of deployment current state | ||
// note that there can be a race condition here, by the time it fetches the latest status, deployment might be succeeded | ||
status := v1.DeploymentCondition{} | ||
if len(dep.Status.Conditions) >= 1 { | ||
if len(dep.Status.Conditions) > 0 { | ||
status = dep.Status.Conditions[len(dep.Status.Conditions)-1] | ||
} | ||
return fmt.Errorf("deployment timed out, name: %s, namespace %s, status: %s, reason: %s", item.GetName(), item.GetNamespace(), status.Message, status.Reason) | ||
|
||
return nil, fmt.Errorf("deployment timed out, name: %s, namespace %s, status: %s, reason: %s", item.GetName(), item.GetNamespace(), status.Message, status.Reason) | ||
case <-readinessCh: | ||
case <-doneCh: | ||
logger.Info(fmt.Sprintf("Marking deployment %s in namespace %s as complete", item.GetName(), item.GetNamespace())) | ||
return properties, nil | ||
case <-watchErrorCh: | ||
return nil, err | ||
} | ||
} | ||
return nil | ||
|
||
func (handler *kubernetesHandler) Delete(ctx context.Context, options *DeleteOptions) error { | ||
identity := &resourcemodel.KubernetesIdentity{} | ||
if err := store.DecodeMap(options.Resource.Identity.Data, identity); err != nil { | ||
case err := <-errCh: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lol. we never tried to get error from the channel, which was bug. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 😆 |
||
return err | ||
} | ||
|
||
item := unstructured.Unstructured{ | ||
Object: map[string]any{ | ||
"apiVersion": identity.APIVersion, | ||
"kind": identity.Kind, | ||
"metadata": map[string]any{ | ||
"namespace": identity.Namespace, | ||
"name": identity.Name, | ||
}, | ||
}, | ||
} | ||
|
||
return client.IgnoreNotFound(handler.client.Delete(ctx, &item)) | ||
} | ||
|
||
func convertToUnstructured(resource rpv1.OutputResource) (unstructured.Unstructured, error) { | ||
if resource.ResourceType.Provider != resourcemodel.ProviderKubernetes { | ||
return unstructured.Unstructured{}, errors.New("wrong resource type") | ||
} | ||
|
||
obj, ok := resource.Resource.(runtime.Object) | ||
if !ok { | ||
return unstructured.Unstructured{}, errors.New("inner type was not a runtime.Object") | ||
} | ||
|
||
c, err := runtime.DefaultUnstructuredConverter.ToUnstructured(resource.Resource) | ||
if err != nil { | ||
return unstructured.Unstructured{}, fmt.Errorf("could not convert object %v to unstructured: %w", obj.GetObjectKind(), err) | ||
} | ||
|
||
return unstructured.Unstructured{Object: c}, nil | ||
} | ||
|
||
func (handler *kubernetesHandler) WatchUntilReady(ctx context.Context, informerFactory informers.SharedInformerFactory, item client.Object, readinessCh chan<- bool, watchErrorCh chan<- error, eventHandlerInvokedCh chan struct{}) { | ||
deploymentInformer := informerFactory.Apps().V1().Deployments().Informer() | ||
func (handler *kubernetesHandler) startDeploymentInformer(ctx context.Context, item client.Object, doneCh chan<- bool, errCh chan<- error) { | ||
informers := informers.NewSharedInformerFactoryWithOptions(handler.clientSet, handler.cacheResyncInterval, informers.WithNamespace(item.GetNamespace())) | ||
deploymentInformer := informers.Apps().V1().Deployments().Informer() | ||
handlers := cache.ResourceEventHandlerFuncs{ | ||
AddFunc: func(new_obj any) { | ||
obj, ok := new_obj.(*v1.Deployment) | ||
if !ok { | ||
watchErrorCh <- errors.New("deployment object is not of appsv1.Deployment type") | ||
errCh <- errors.New("deployment object is not of appsv1.Deployment type") | ||
return | ||
} | ||
|
||
// There might be parallel deployments in progress, we need to make sure we are watching the right one | ||
if obj.Name != item.GetName() { | ||
return | ||
} | ||
|
||
// This channel is used only by test code to signal that the deployment is being watched | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this was used for testing. We can test without this. |
||
if eventHandlerInvokedCh != nil { | ||
eventHandlerInvokedCh <- struct{}{} | ||
} | ||
|
||
handler.watchUntilDeploymentReady(ctx, obj, readinessCh) | ||
handler.checkDeploymentStatus(ctx, obj, doneCh) | ||
}, | ||
UpdateFunc: func(old_obj, new_obj any) { | ||
old, ok := old_obj.(*v1.Deployment) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do not know how to test this with fakeImformer. :( |
||
if !ok { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the redundant check. This will never happen since it listens v1 deployment informer. |
||
watchErrorCh <- errors.New("old deployment object is not of appsv1.Deployment type") | ||
errCh <- errors.New("old deployment object is not of appsv1.Deployment type") | ||
return | ||
} | ||
new, ok := new_obj.(*v1.Deployment) | ||
if !ok { | ||
watchErrorCh <- errors.New("new deployment object not of appsv1.Deployment type") | ||
errCh <- errors.New("new deployment object is not of appsv1.Deployment type") | ||
return | ||
} | ||
|
||
|
@@ -223,38 +191,74 @@ func (handler *kubernetesHandler) WatchUntilReady(ctx context.Context, informerF | |
return | ||
} | ||
|
||
// This channel is used only by test code to signal that the deployment is being watched | ||
if eventHandlerInvokedCh != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is unnecessary code for testing. |
||
eventHandlerInvokedCh <- struct{}{} | ||
} | ||
|
||
if old.ResourceVersion != new.ResourceVersion { | ||
handler.watchUntilDeploymentReady(ctx, new, readinessCh) | ||
handler.checkDeploymentStatus(ctx, new, doneCh) | ||
} | ||
}, | ||
DeleteFunc: func(obj any) { | ||
// no-op here | ||
}, | ||
} | ||
|
||
deploymentInformer.AddEventHandler(handlers) | ||
|
||
// Start the informer | ||
informerFactory.Start(wait.NeverStop) | ||
informerFactory.WaitForCacheSync(wait.NeverStop) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not a good way since it waits all shared informers lol. we needs to wait with only infomers that we created here. |
||
go func(stopCh <-chan struct{}) { | ||
informers.Start(stopCh) | ||
informers.WaitForCacheSync(stopCh) | ||
}(ctx.Done()) | ||
youngbupark marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
func (handler *kubernetesHandler) watchUntilDeploymentReady(ctx context.Context, obj *v1.Deployment, readinessCh chan<- bool) { | ||
func (handler *kubernetesHandler) checkDeploymentStatus(ctx context.Context, obj *v1.Deployment, doneCh chan<- bool) { | ||
logger := ucplog.FromContextOrDiscard(ctx) | ||
for _, c := range obj.Status.Conditions { | ||
// check for complete deployment condition | ||
// Reference https://kubernetes.io/docs/concepts/workloads/controllers/deployment/#complete-deployment | ||
if c.Type == v1.DeploymentProgressing && c.Status == corev1.ConditionTrue && strings.EqualFold(c.Reason, "NewReplicaSetAvailable") { | ||
logger.Info(fmt.Sprintf("Deployment status for deployment: %s in namespace: %s is: %s - %s, Reason: %s", obj.Name, obj.Namespace, c.Type, c.Status, c.Reason)) | ||
|
||
// ObservedGeneration should be updated to latest generation to avoid stale replicas | ||
if obj.Status.ObservedGeneration >= obj.Generation { | ||
logger.Info(fmt.Sprintf("Deployment %s in namespace %s is ready. Observed generation: %d, Generation: %d", obj.Name, obj.Namespace, obj.Status.ObservedGeneration, obj.Generation)) | ||
readinessCh <- true | ||
doneCh <- true | ||
youngbupark marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
} | ||
} | ||
|
||
func (handler *kubernetesHandler) Delete(ctx context.Context, options *DeleteOptions) error { | ||
identity := &resourcemodel.KubernetesIdentity{} | ||
if err := store.DecodeMap(options.Resource.Identity.Data, identity); err != nil { | ||
return err | ||
} | ||
|
||
item := unstructured.Unstructured{ | ||
Object: map[string]any{ | ||
"apiVersion": identity.APIVersion, | ||
"kind": identity.Kind, | ||
"metadata": map[string]any{ | ||
"namespace": identity.Namespace, | ||
"name": identity.Name, | ||
}, | ||
}, | ||
} | ||
|
||
return client.IgnoreNotFound(handler.client.Delete(ctx, &item)) | ||
} | ||
|
||
func convertToUnstructured(resource rpv1.OutputResource) (unstructured.Unstructured, error) { | ||
if resource.ResourceType.Provider != resourcemodel.ProviderKubernetes { | ||
return unstructured.Unstructured{}, errors.New("wrong resource type") | ||
} | ||
|
||
obj, ok := resource.Resource.(runtime.Object) | ||
if !ok { | ||
return unstructured.Unstructured{}, errors.New("inner type was not a runtime.Object") | ||
} | ||
|
||
c, err := runtime.DefaultUnstructuredConverter.ToUnstructured(resource.Resource) | ||
if err != nil { | ||
return unstructured.Unstructured{}, fmt.Errorf("could not convert object %v to unstructured: %w", obj.GetObjectKind(), err) | ||
} | ||
|
||
return unstructured.Unstructured{Object: c}, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added timeout duration to kubernetesHandler struct. So we can avoid the test specific code in the logic. (I do not like the old pattern)