diff --git a/pkg/platform/proxy/apps/daemonset/storage/event.go b/pkg/platform/proxy/apps/daemonset/storage/event.go index fae8aa151..52fedf5b4 100644 --- a/pkg/platform/proxy/apps/daemonset/storage/event.go +++ b/pkg/platform/proxy/apps/daemonset/storage/event.go @@ -32,6 +32,7 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" "k8s.io/client-go/kubernetes" @@ -84,22 +85,16 @@ func listEventsByExtensions(ctx context.Context, client *kubernetes.Clientset, n return nil, errors.NewNotFound(extensionsv1beta1.Resource("daemonsets/events"), name) } - selector := fields.AndSelectors( - fields.OneTermEqualSelector("involvedObject.uid", string(daemonSet.UID)), - fields.OneTermEqualSelector("involvedObject.name", daemonSet.Name), - fields.OneTermEqualSelector("involvedObject.namespace", daemonSet.Namespace), - fields.OneTermEqualSelector("involvedObject.kind", "DaemonSet")) - listOptions := metav1.ListOptions{ - FieldSelector: selector.String(), - } - daemonSetEvents, err := client.CoreV1().Events(namespaceName).List(ctx, listOptions) - if err != nil { - return nil, err + var resultEvents util.EventSlice + + events, errs := getAboutDaemonSetEvents(ctx, client, name, namespaceName, string(daemonSet.UID)) + if len(errs) > 0 { + return nil, utilerrors.NewAggregate(errs) } - var events util.EventSlice - for _, daemonSetEvent := range daemonSetEvents.Items { - events = append(events, daemonSetEvent) + involvedObjectUIDMap := util.GetInvolvedObjectUIDMap(events) + if v, ok := involvedObjectUIDMap[string(daemonSet.UID)]; ok { + resultEvents = append(resultEvents, v...) } podSelector, err := metav1.LabelSelectorAsSelector(daemonSet.Spec.Selector) @@ -117,30 +112,17 @@ func listEventsByExtensions(ctx context.Context, client *kubernetes.Clientset, n for _, pod := range podAllList.Items { for _, podReferences := range pod.ObjectMeta.OwnerReferences { if (podReferences.Kind == "DaemonSet") && (podReferences.Name == daemonSet.Name) { - podEventsSelector := fields.AndSelectors( - fields.OneTermEqualSelector("involvedObject.uid", string(pod.UID)), - fields.OneTermEqualSelector("involvedObject.name", pod.Name), - fields.OneTermEqualSelector("involvedObject.namespace", pod.Namespace), - fields.OneTermEqualSelector("involvedObject.kind", "Pod")) - podEventsListOptions := metav1.ListOptions{ - FieldSelector: podEventsSelector.String(), - } - podEvents, err := client.CoreV1().Events(namespaceName).List(ctx, podEventsListOptions) - if err != nil { - return nil, err - } - - for _, podEvent := range podEvents.Items { - events = append(events, podEvent) + if v, ok := involvedObjectUIDMap[string(pod.UID)]; ok { + resultEvents = append(resultEvents, v...) } } } } - sort.Sort(events) + sort.Sort(resultEvents) return &corev1.EventList{ - Items: events, + Items: resultEvents, }, nil } @@ -150,22 +132,16 @@ func listEventsByApps(ctx context.Context, client *kubernetes.Clientset, namespa return nil, errors.NewNotFound(appsv1.Resource("daemonsets/events"), name) } - selector := fields.AndSelectors( - fields.OneTermEqualSelector("involvedObject.uid", string(daemonSet.UID)), - fields.OneTermEqualSelector("involvedObject.name", daemonSet.Name), - fields.OneTermEqualSelector("involvedObject.namespace", daemonSet.Namespace), - fields.OneTermEqualSelector("involvedObject.kind", "DaemonSet")) - listOptions := metav1.ListOptions{ - FieldSelector: selector.String(), - } - daemonSetEvents, err := client.CoreV1().Events(namespaceName).List(ctx, listOptions) - if err != nil { - return nil, err + var resultEvents util.EventSlice + + events, errs := getAboutDaemonSetEvents(ctx, client, name, namespaceName, string(daemonSet.UID)) + if len(errs) > 0 { + return nil, utilerrors.NewAggregate(errs) } - var events util.EventSlice - for _, daemonSetEvent := range daemonSetEvents.Items { - events = append(events, daemonSetEvent) + involvedObjectUIDMap := util.GetInvolvedObjectUIDMap(events) + if v, ok := involvedObjectUIDMap[string(daemonSet.UID)]; ok { + resultEvents = append(resultEvents, v...) } podSelector, err := metav1.LabelSelectorAsSelector(daemonSet.Spec.Selector) @@ -183,29 +159,36 @@ func listEventsByApps(ctx context.Context, client *kubernetes.Clientset, namespa for _, pod := range podAllList.Items { for _, podReferences := range pod.ObjectMeta.OwnerReferences { if (podReferences.Kind == "DaemonSet") && (podReferences.Name == daemonSet.Name) { - podEventsSelector := fields.AndSelectors( - fields.OneTermEqualSelector("involvedObject.uid", string(pod.UID)), - fields.OneTermEqualSelector("involvedObject.name", pod.Name), - fields.OneTermEqualSelector("involvedObject.namespace", pod.Namespace), - fields.OneTermEqualSelector("involvedObject.kind", "Pod")) - podEventsListOptions := metav1.ListOptions{ - FieldSelector: podEventsSelector.String(), - } - podEvents, err := client.CoreV1().Events(namespaceName).List(ctx, podEventsListOptions) - if err != nil { - return nil, err - } - - for _, podEvent := range podEvents.Items { - events = append(events, podEvent) + if v, ok := involvedObjectUIDMap[string(pod.UID)]; ok { + resultEvents = append(resultEvents, v...) } } } } - sort.Sort(events) + sort.Sort(resultEvents) return &corev1.EventList{ - Items: events, + Items: resultEvents, }, nil } + +// getAboutDaemonSetEvents Query all events in the namespace or Query the DaemonSet Pod asynchronously +func getAboutDaemonSetEvents(ctx context.Context, client *kubernetes.Clientset, name, namespace, uid string) (util.EventSlice, []error) { + return util.GetResourcesEvents(ctx, client, namespace, []metav1.ListOptions{ + { + FieldSelector: fields.AndSelectors( + fields.OneTermEqualSelector("involvedObject.uid", uid), + fields.OneTermEqualSelector("involvedObject.name", name), + fields.OneTermEqualSelector("involvedObject.namespace", namespace), + fields.OneTermEqualSelector("involvedObject.kind", "DaemonSet")).String(), + ResourceVersion: "0", + }, + { + FieldSelector: fields.AndSelectors( + fields.OneTermEqualSelector("involvedObject.namespace", namespace), + fields.OneTermEqualSelector("involvedObject.kind", "Pod")).String(), + ResourceVersion: "0", + }, + }) +} diff --git a/pkg/platform/proxy/apps/deployment/storage/event.go b/pkg/platform/proxy/apps/deployment/storage/event.go index 42bfe48f3..fefcdc1f5 100644 --- a/pkg/platform/proxy/apps/deployment/storage/event.go +++ b/pkg/platform/proxy/apps/deployment/storage/event.go @@ -21,7 +21,6 @@ package storage import ( "context" "sort" - "sync" "tkestack.io/tke/pkg/util/apiclient" @@ -62,28 +61,6 @@ func (r *EventREST) New() runtime.Object { return &corev1.EventList{} } -type eventsFinder struct { - wg sync.WaitGroup - mutex sync.Mutex - namespaceName string - platformClient platforminternalclient.PlatformInterface - client kubernetes.Clientset - ctx context.Context - events util.EventSlice - errors []error -} - -func newEventsFinder(ctx context.Context, namespaceName string, client kubernetes.Clientset, platformClient platforminternalclient.PlatformInterface) *eventsFinder { - return &eventsFinder{ - platformClient: platformClient, - ctx: ctx, - client: client, - namespaceName: namespaceName, - events: nil, - errors: make([]error, 0), - } -} - // Get retrieves the object from the storage. It is required to support Patch. func (r *EventREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { client, err := proxy.ClientSet(ctx, r.platformClient) @@ -96,52 +73,29 @@ func (r *EventREST) Get(ctx context.Context, name string, options *metav1.GetOpt return nil, errors.NewBadRequest("a namespace must be specified") } - ef := newEventsFinder(ctx, namespaceName, *client, r.platformClient) - if apiclient.ClusterVersionIsBefore19(client) { - return ef.listEventsByExtensions(ctx, client, namespaceName, name, options) + return listEventsByExtensions(ctx, client, namespaceName, name, options) } - return ef.listEventsByApps(ctx, client, namespaceName, name, options) + return listEventsByApps(ctx, client, namespaceName, name, options) } -func (ef *eventsFinder) getEvents(ctx context.Context, listOptions metav1.ListOptions) { - defer ef.wg.Done() - - events, err := ef.client.CoreV1().Events(ef.namespaceName).List(ctx, listOptions) - if err != nil { - ef.mutex.Lock() - ef.errors = append(ef.errors, err) - ef.mutex.Unlock() - return - } - if len(events.Items) == 0 { - return - } - - ef.mutex.Lock() - for _, event := range events.Items { - ef.events = append(ef.events, event) - } - ef.mutex.Unlock() -} - -func (ef *eventsFinder) listEventsByExtensions(ctx context.Context, client *kubernetes.Clientset, namespaceName, name string, options *metav1.GetOptions) (runtime.Object, error) { +func listEventsByExtensions(ctx context.Context, client *kubernetes.Clientset, namespaceName, name string, options *metav1.GetOptions) (runtime.Object, error) { deployment, err := client.ExtensionsV1beta1().Deployments(namespaceName).Get(ctx, name, *options) if err != nil { return nil, errors.NewNotFound(extensionsv1beta1.Resource("deployments/events"), name) } - deploymentSelector := fields.AndSelectors( - fields.OneTermEqualSelector("involvedObject.uid", string(deployment.UID)), - fields.OneTermEqualSelector("involvedObject.name", deployment.Name), - fields.OneTermEqualSelector("involvedObject.namespace", deployment.Namespace), - fields.OneTermEqualSelector("involvedObject.kind", "Deployment")) - deploymentListOptions := metav1.ListOptions{ - FieldSelector: deploymentSelector.String(), + var resultEvents util.EventSlice + + events, errs := getAboutDeployEvents(ctx, client, deployment.Name, deployment.Namespace, string(deployment.UID)) + if len(errs) > 0 { + return nil, utilerrors.NewAggregate(errs) } - ef.wg.Add(1) - go ef.getEvents(ctx, deploymentListOptions) + involvedObjectUIDMap := util.GetInvolvedObjectUIDMap(events) + if v, ok := involvedObjectUIDMap[string(deployment.UID)]; ok { + resultEvents = append(resultEvents, v...) + } rsSelector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector) if err != nil { @@ -170,16 +124,9 @@ func (ef *eventsFinder) listEventsByExtensions(ctx context.Context, client *kube continue } - rsEventsSelector := fields.AndSelectors( - fields.OneTermEqualSelector("involvedObject.uid", string(rs.UID)), - fields.OneTermEqualSelector("involvedObject.name", rs.Name), - fields.OneTermEqualSelector("involvedObject.namespace", rs.Namespace), - fields.OneTermEqualSelector("involvedObject.kind", "ReplicaSet")) - rsEventsListOptions := metav1.ListOptions{ - FieldSelector: rsEventsSelector.String(), + if v, ok := involvedObjectUIDMap[string(rs.UID)]; ok { + resultEvents = append(resultEvents, v...) } - ef.wg.Add(1) - go ef.getEvents(ctx, rsEventsListOptions) for _, references := range rs.ObjectMeta.OwnerReferences { if (references.Kind == "Deployment") && (references.Name == name) { @@ -192,19 +139,13 @@ func (ef *eventsFinder) listEventsByExtensions(ctx context.Context, client *kube if err != nil { return nil, err } + // Events cannot be queried for the deleted pods for _, pod := range podListByRS.Items { for _, podReferences := range pod.ObjectMeta.OwnerReferences { if (podReferences.Kind == "ReplicaSet") && (podReferences.Name == rs.Name) { - podEventsSelector := fields.AndSelectors( - fields.OneTermEqualSelector("involvedObject.uid", string(pod.UID)), - fields.OneTermEqualSelector("involvedObject.name", pod.Name), - fields.OneTermEqualSelector("involvedObject.namespace", pod.Namespace), - fields.OneTermEqualSelector("involvedObject.kind", "Pod")) - podEventsListOptions := metav1.ListOptions{ - FieldSelector: podEventsSelector.String(), + if v, ok := involvedObjectUIDMap[string(pod.UID)]; ok { + resultEvents = append(resultEvents, v...) } - ef.wg.Add(1) - go ef.getEvents(ctx, podEventsListOptions) } } } @@ -212,35 +153,30 @@ func (ef *eventsFinder) listEventsByExtensions(ctx context.Context, client *kube } } - ef.wg.Wait() - if len(ef.errors) > 0 { - return nil, utilerrors.NewAggregate(ef.errors) - } - - sort.Sort(ef.events) + sort.Sort(resultEvents) return &corev1.EventList{ - Items: ef.events, + Items: resultEvents, }, nil } -func (ef *eventsFinder) listEventsByApps(ctx context.Context, client *kubernetes.Clientset, namespaceName, name string, options *metav1.GetOptions) (runtime.Object, error) { +func listEventsByApps(ctx context.Context, client *kubernetes.Clientset, namespaceName, name string, options *metav1.GetOptions) (runtime.Object, error) { deployment, err := client.AppsV1().Deployments(namespaceName).Get(ctx, name, *options) if err != nil { return nil, errors.NewNotFound(appsv1.Resource("deployments/events"), name) } - deploymentSelector := fields.AndSelectors( - fields.OneTermEqualSelector("involvedObject.uid", string(deployment.UID)), - fields.OneTermEqualSelector("involvedObject.name", deployment.Name), - fields.OneTermEqualSelector("involvedObject.namespace", deployment.Namespace), - fields.OneTermEqualSelector("involvedObject.kind", "Deployment")) - deploymentListOptions := metav1.ListOptions{ - FieldSelector: deploymentSelector.String(), + var resultEvents util.EventSlice + + events, errs := getAboutDeployEvents(ctx, client, deployment.Name, deployment.Namespace, string(deployment.UID)) + if len(errs) > 0 { + return nil, utilerrors.NewAggregate(errs) } - ef.wg.Add(1) - go ef.getEvents(ctx, deploymentListOptions) + involvedObjectUIDMap := util.GetInvolvedObjectUIDMap(events) + if v, ok := involvedObjectUIDMap[string(deployment.UID)]; ok { + resultEvents = append(resultEvents, v...) + } rsSelector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector) if err != nil { @@ -269,16 +205,9 @@ func (ef *eventsFinder) listEventsByApps(ctx context.Context, client *kubernetes continue } - rsEventsSelector := fields.AndSelectors( - fields.OneTermEqualSelector("involvedObject.uid", string(rs.UID)), - fields.OneTermEqualSelector("involvedObject.name", rs.Name), - fields.OneTermEqualSelector("involvedObject.namespace", rs.Namespace), - fields.OneTermEqualSelector("involvedObject.kind", "ReplicaSet")) - rsEventsListOptions := metav1.ListOptions{ - FieldSelector: rsEventsSelector.String(), + if v, ok := involvedObjectUIDMap[string(rs.UID)]; ok { + resultEvents = append(resultEvents, v...) } - ef.wg.Add(1) - go ef.getEvents(ctx, rsEventsListOptions) for _, references := range rs.ObjectMeta.OwnerReferences { if (references.Kind == "Deployment") && (references.Name == name) { @@ -294,16 +223,9 @@ func (ef *eventsFinder) listEventsByApps(ctx context.Context, client *kubernetes for _, pod := range podListByRS.Items { for _, podReferences := range pod.ObjectMeta.OwnerReferences { if (podReferences.Kind == "ReplicaSet") && (podReferences.Name == rs.Name) { - podEventsSelector := fields.AndSelectors( - fields.OneTermEqualSelector("involvedObject.uid", string(pod.UID)), - fields.OneTermEqualSelector("involvedObject.name", pod.Name), - fields.OneTermEqualSelector("involvedObject.namespace", pod.Namespace), - fields.OneTermEqualSelector("involvedObject.kind", "Pod")) - podEventsListOptions := metav1.ListOptions{ - FieldSelector: podEventsSelector.String(), + if v, ok := involvedObjectUIDMap[string(pod.UID)]; ok { + resultEvents = append(resultEvents, v...) } - ef.wg.Add(1) - go ef.getEvents(ctx, podEventsListOptions) } } } @@ -311,14 +233,35 @@ func (ef *eventsFinder) listEventsByApps(ctx context.Context, client *kubernetes } } - ef.wg.Wait() - if len(ef.errors) > 0 { - return nil, utilerrors.NewAggregate(ef.errors) - } - - sort.Sort(ef.events) + sort.Sort(resultEvents) return &corev1.EventList{ - Items: ef.events, + Items: resultEvents, }, nil } + +// getAboutDeployEvents Query all events in the namespace or Query the Deployment ReplicaSet Pod asynchronously +func getAboutDeployEvents(ctx context.Context, client *kubernetes.Clientset, name, namespace, uid string) (util.EventSlice, []error) { + return util.GetResourcesEvents(ctx, client, namespace, []metav1.ListOptions{ + { + FieldSelector: fields.AndSelectors( + fields.OneTermEqualSelector("involvedObject.uid", uid), + fields.OneTermEqualSelector("involvedObject.name", name), + fields.OneTermEqualSelector("involvedObject.namespace", namespace), + fields.OneTermEqualSelector("involvedObject.kind", "Deployment")).String(), + ResourceVersion: "0", + }, + { + FieldSelector: fields.AndSelectors( + fields.OneTermEqualSelector("involvedObject.namespace", namespace), + fields.OneTermEqualSelector("involvedObject.kind", "ReplicaSet")).String(), + ResourceVersion: "0", + }, + { + FieldSelector: fields.AndSelectors( + fields.OneTermEqualSelector("involvedObject.namespace", namespace), + fields.OneTermEqualSelector("involvedObject.kind", "Pod")).String(), + ResourceVersion: "0", + }, + }) +} diff --git a/pkg/platform/proxy/apps/statefulset/storage/event.go b/pkg/platform/proxy/apps/statefulset/storage/event.go index 0c26fce99..fb1a6ee50 100644 --- a/pkg/platform/proxy/apps/statefulset/storage/event.go +++ b/pkg/platform/proxy/apps/statefulset/storage/event.go @@ -32,6 +32,7 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" "k8s.io/client-go/kubernetes" @@ -84,22 +85,16 @@ func listEventsByApps(ctx context.Context, client *kubernetes.Clientset, namespa return nil, errors.NewNotFound(appsv1.Resource("statefulsets/events"), name) } - selector := fields.AndSelectors( - fields.OneTermEqualSelector("involvedObject.uid", string(statefulSet.UID)), - fields.OneTermEqualSelector("involvedObject.name", statefulSet.Name), - fields.OneTermEqualSelector("involvedObject.namespace", statefulSet.Namespace), - fields.OneTermEqualSelector("involvedObject.kind", "StatefulSet")) - listOptions := metav1.ListOptions{ - FieldSelector: selector.String(), - } - statefulSetEvents, err := client.CoreV1().Events(namespaceName).List(ctx, listOptions) - if err != nil { - return nil, err + var resultEvents util.EventSlice + + events, errs := getAboutStatefulSetEvents(ctx, client, name, namespaceName, string(statefulSet.UID)) + if len(errs) > 0 { + return nil, utilerrors.NewAggregate(errs) } - var events util.EventSlice - for _, statefulSetEvent := range statefulSetEvents.Items { - events = append(events, statefulSetEvent) + involvedObjectUIDMap := util.GetInvolvedObjectUIDMap(events) + if v, ok := involvedObjectUIDMap[string(statefulSet.UID)]; ok { + resultEvents = append(resultEvents, v...) } podSelector, err := metav1.LabelSelectorAsSelector(statefulSet.Spec.Selector) @@ -117,30 +112,17 @@ func listEventsByApps(ctx context.Context, client *kubernetes.Clientset, namespa for _, pod := range podAllList.Items { for _, podReferences := range pod.ObjectMeta.OwnerReferences { if (podReferences.Kind == "StatefulSet") && (podReferences.Name == statefulSet.Name) { - podEventsSelector := fields.AndSelectors( - fields.OneTermEqualSelector("involvedObject.uid", string(pod.UID)), - fields.OneTermEqualSelector("involvedObject.name", pod.Name), - fields.OneTermEqualSelector("involvedObject.namespace", pod.Namespace), - fields.OneTermEqualSelector("involvedObject.kind", "Pod")) - podEventsListOptions := metav1.ListOptions{ - FieldSelector: podEventsSelector.String(), - } - podEvents, err := client.CoreV1().Events(namespaceName).List(ctx, podEventsListOptions) - if err != nil { - return nil, err - } - - for _, podEvent := range podEvents.Items { - events = append(events, podEvent) + if v, ok := involvedObjectUIDMap[string(pod.UID)]; ok { + resultEvents = append(resultEvents, v...) } } } } - sort.Sort(events) + sort.Sort(resultEvents) return &corev1.EventList{ - Items: events, + Items: resultEvents, }, nil } @@ -150,22 +132,16 @@ func listEventsByAppsBeta(ctx context.Context, client *kubernetes.Clientset, nam return nil, errors.NewNotFound(appsv1beta1.Resource("statefulsets/events"), name) } - selector := fields.AndSelectors( - fields.OneTermEqualSelector("involvedObject.uid", string(statefulSet.UID)), - fields.OneTermEqualSelector("involvedObject.name", statefulSet.Name), - fields.OneTermEqualSelector("involvedObject.namespace", statefulSet.Namespace), - fields.OneTermEqualSelector("involvedObject.kind", "StatefulSet")) - listOptions := metav1.ListOptions{ - FieldSelector: selector.String(), - } - statefulSetEvents, err := client.CoreV1().Events(namespaceName).List(ctx, listOptions) - if err != nil { - return nil, err + var resultEvents util.EventSlice + + events, errs := getAboutStatefulSetEvents(ctx, client, name, namespaceName, string(statefulSet.UID)) + if len(errs) > 0 { + return nil, utilerrors.NewAggregate(errs) } - var events util.EventSlice - for _, statefulSetEvent := range statefulSetEvents.Items { - events = append(events, statefulSetEvent) + involvedObjectUIDMap := util.GetInvolvedObjectUIDMap(events) + if v, ok := involvedObjectUIDMap[string(statefulSet.UID)]; ok { + resultEvents = append(resultEvents, v...) } podSelector, err := metav1.LabelSelectorAsSelector(statefulSet.Spec.Selector) @@ -183,29 +159,36 @@ func listEventsByAppsBeta(ctx context.Context, client *kubernetes.Clientset, nam for _, pod := range podAllList.Items { for _, podReferences := range pod.ObjectMeta.OwnerReferences { if (podReferences.Kind == "StatefulSet") && (podReferences.Name == statefulSet.Name) { - podEventsSelector := fields.AndSelectors( - fields.OneTermEqualSelector("involvedObject.uid", string(pod.UID)), - fields.OneTermEqualSelector("involvedObject.name", pod.Name), - fields.OneTermEqualSelector("involvedObject.namespace", pod.Namespace), - fields.OneTermEqualSelector("involvedObject.kind", "Pod")) - podEventsListOptions := metav1.ListOptions{ - FieldSelector: podEventsSelector.String(), - } - podEvents, err := client.CoreV1().Events(namespaceName).List(ctx, podEventsListOptions) - if err != nil { - return nil, err - } - - for _, podEvent := range podEvents.Items { - events = append(events, podEvent) + if v, ok := involvedObjectUIDMap[string(pod.UID)]; ok { + resultEvents = append(resultEvents, v...) } } } } - sort.Sort(events) + sort.Sort(resultEvents) return &corev1.EventList{ - Items: events, + Items: resultEvents, }, nil } + +// getAboutStatefulSetEvents Query all events in the namespace or Query the StatefulSet Pod asynchronously +func getAboutStatefulSetEvents(ctx context.Context, client *kubernetes.Clientset, name, namespace, uid string) (util.EventSlice, []error) { + return util.GetResourcesEvents(ctx, client, namespace, []metav1.ListOptions{ + { + FieldSelector: fields.AndSelectors( + fields.OneTermEqualSelector("involvedObject.uid", uid), + fields.OneTermEqualSelector("involvedObject.name", name), + fields.OneTermEqualSelector("involvedObject.namespace", namespace), + fields.OneTermEqualSelector("involvedObject.kind", "StatefulSet")).String(), + ResourceVersion: "0", + }, + { + FieldSelector: fields.AndSelectors( + fields.OneTermEqualSelector("involvedObject.namespace", namespace), + fields.OneTermEqualSelector("involvedObject.kind", "Pod")).String(), + ResourceVersion: "0", + }, + }) +} diff --git a/pkg/platform/util/event.go b/pkg/platform/util/event.go index 6abf44f66..f20f93ed7 100644 --- a/pkg/platform/util/event.go +++ b/pkg/platform/util/event.go @@ -20,6 +20,7 @@ package util import ( "context" + "sync" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -54,3 +55,51 @@ func GetEvents(ctx context.Context, client *kubernetes.Clientset, uid, namespace } return client.CoreV1().Events(namespace).List(ctx, listOptions) } + +// GetInvolvedObjectUIDMap Get uid events map +func GetInvolvedObjectUIDMap(events EventSlice) map[string][]corev1.Event { + involvedObjectUIDMap := make(map[string][]corev1.Event) + for _, event := range events { + if v, ok := involvedObjectUIDMap[string(event.InvolvedObject.UID)]; ok { + involvedObjectUIDMap[string(event.InvolvedObject.UID)] = append(v, event) + continue + } + involvedObjectUIDMap[string(event.InvolvedObject.UID)] = EventSlice{ + event, + } + } + return involvedObjectUIDMap +} + +// GetResourcesEvents list the resources events by resource namespace. +func GetResourcesEvents(ctx context.Context, client *kubernetes.Clientset, namespace string, listOptions []metav1.ListOptions) (EventSlice, []error) { + var wg sync.WaitGroup + var mutex sync.Mutex + + var resultEvents EventSlice + errors := make([]error, 0) + + for _, listOption := range listOptions { + wg.Add(1) + go func(listOption metav1.ListOptions) { + defer wg.Done() + events, err := client.CoreV1().Events(namespace).List(ctx, listOption) + if err != nil { + mutex.Lock() + errors = append(errors, err) + mutex.Unlock() + return + } + if len(events.Items) == 0 { + return + } + mutex.Lock() + for _, event := range events.Items { + resultEvents = append(resultEvents, event) + } + mutex.Unlock() + }(listOption) + } + wg.Wait() + return resultEvents, errors +}