Skip to content

Commit

Permalink
Make Event on Evict
Browse files Browse the repository at this point in the history
  • Loading branch information
adrianmoisey committed Oct 17, 2024
1 parent 64a6432 commit 83759d4
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const (
type PodsEvictionRestriction interface {
// Evict sends eviction instruction to the api client.
// Returns error if pod cannot be evicted or if client returned error.
Evict(pod *apiv1.Pod, eventRecorder record.EventRecorder) error
Evict(pod *apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler, eventRecorder record.EventRecorder) error
// CanEvict checks if pod can be safely evicted
CanEvict(pod *apiv1.Pod) bool
}
Expand Down Expand Up @@ -122,7 +122,7 @@ func (e *podsEvictionRestrictionImpl) CanEvict(pod *apiv1.Pod) bool {

// Evict sends eviction instruction to api client. Returns error if pod cannot be evicted or if client returned error
// Does not check if pod was actually evicted after eviction grace period.
func (e *podsEvictionRestrictionImpl) Evict(podToEvict *apiv1.Pod, eventRecorder record.EventRecorder) error {
func (e *podsEvictionRestrictionImpl) Evict(podToEvict *apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler, eventRecorder record.EventRecorder) error {
cr, present := e.podToReplicaCreatorMap[getPodID(podToEvict)]
if !present {
return fmt.Errorf("pod not suitable for eviction %s/%s: not in replicated pods map", podToEvict.Namespace, podToEvict.Name)
Expand All @@ -146,6 +146,9 @@ func (e *podsEvictionRestrictionImpl) Evict(podToEvict *apiv1.Pod, eventRecorder
eventRecorder.Event(podToEvict, apiv1.EventTypeNormal, "EvictedByVPA",
"Pod was evicted by VPA Updater to apply resource recommendation.")

eventRecorder.Event(vpa, apiv1.EventTypeNormal, "EvictedPod",
"VPA 4 Updater evicted pod "+podToEvict.Namespace+"/"+podToEvict.Name)

if podToEvict.Status.Phase != apiv1.PodPending {
singleGroupStats, present := e.creatorToSingleGroupStatsMap[cr]
if !present {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
apiv1 "k8s.io/api/core/v1"
Expand All @@ -38,6 +40,7 @@ type podWithExpectations struct {
pod *apiv1.Pod
canEvict bool
evictionSuccess bool
events int
}

func getBasicVpa() *vpa_types.VerticalPodAutoscaler {
Expand Down Expand Up @@ -281,15 +284,14 @@ func TestEvictReplicatedByController(t *testing.T) {
assert.Equalf(t, p.canEvict, eviction.CanEvict(p.pod), "TC %v - unexpected CanEvict result for pod-%v %#v", testCase.name, i, p.pod)
}
for i, p := range testCase.pods {
err := eviction.Evict(p.pod, test.FakeEventRecorder())
err := eviction.Evict(p.pod, testCase.vpa, test.FakeEventRecorder())
if p.evictionSuccess {
assert.NoErrorf(t, err, "TC %v - unexpected Evict result for pod-%v %#v", testCase.name, i, p.pod)
} else {
assert.Errorf(t, err, "TC %v - unexpected Evict result for pod-%v %#v", testCase.name, i, p.pod)
}
}
}

}

func TestEvictReplicatedByReplicaSet(t *testing.T) {
Expand Down Expand Up @@ -322,11 +324,11 @@ func TestEvictReplicatedByReplicaSet(t *testing.T) {
}

for _, pod := range pods[:2] {
err := eviction.Evict(pod, test.FakeEventRecorder())
err := eviction.Evict(pod, getBasicVpa(), test.FakeEventRecorder())
assert.Nil(t, err, "Should evict with no error")
}
for _, pod := range pods[2:] {
err := eviction.Evict(pod, test.FakeEventRecorder())
err := eviction.Evict(pod, getBasicVpa(), test.FakeEventRecorder())
assert.Error(t, err, "Error expected")
}
}
Expand Down Expand Up @@ -361,11 +363,11 @@ func TestEvictReplicatedByStatefulSet(t *testing.T) {
}

for _, pod := range pods[:2] {
err := eviction.Evict(pod, test.FakeEventRecorder())
err := eviction.Evict(pod, getBasicVpa(), test.FakeEventRecorder())
assert.Nil(t, err, "Should evict with no error")
}
for _, pod := range pods[2:] {
err := eviction.Evict(pod, test.FakeEventRecorder())
err := eviction.Evict(pod, getBasicVpa(), test.FakeEventRecorder())
assert.Error(t, err, "Error expected")
}
}
Expand Down Expand Up @@ -398,11 +400,11 @@ func TestEvictReplicatedByDaemonSet(t *testing.T) {
}

for _, pod := range pods[:2] {
err := eviction.Evict(pod, test.FakeEventRecorder())
err := eviction.Evict(pod, getBasicVpa(), test.FakeEventRecorder())
assert.Nil(t, err, "Should evict with no error")
}
for _, pod := range pods[2:] {
err := eviction.Evict(pod, test.FakeEventRecorder())
err := eviction.Evict(pod, getBasicVpa(), test.FakeEventRecorder())
assert.Error(t, err, "Error expected")
}
}
Expand Down Expand Up @@ -433,11 +435,11 @@ func TestEvictReplicatedByJob(t *testing.T) {
}

for _, pod := range pods[:2] {
err := eviction.Evict(pod, test.FakeEventRecorder())
err := eviction.Evict(pod, getBasicVpa(), test.FakeEventRecorder())
assert.Nil(t, err, "Should evict with no error")
}
for _, pod := range pods[2:] {
err := eviction.Evict(pod, test.FakeEventRecorder())
err := eviction.Evict(pod, getBasicVpa(), test.FakeEventRecorder())
assert.Error(t, err, "Error expected")
}
}
Expand Down Expand Up @@ -472,7 +474,7 @@ func TestEvictTooFewReplicas(t *testing.T) {
}

for _, pod := range pods {
err := eviction.Evict(pod, test.FakeEventRecorder())
err := eviction.Evict(pod, getBasicVpa(), test.FakeEventRecorder())
assert.Error(t, err, "Error expected")
}
}
Expand Down Expand Up @@ -508,11 +510,11 @@ func TestEvictionTolerance(t *testing.T) {
}

for _, pod := range pods[:4] {
err := eviction.Evict(pod, test.FakeEventRecorder())
err := eviction.Evict(pod, getBasicVpa(), test.FakeEventRecorder())
assert.Nil(t, err, "Should evict with no error")
}
for _, pod := range pods[4:] {
err := eviction.Evict(pod, test.FakeEventRecorder())
err := eviction.Evict(pod, getBasicVpa(), test.FakeEventRecorder())
assert.Error(t, err, "Error expected")
}
}
Expand Down Expand Up @@ -548,15 +550,101 @@ func TestEvictAtLeastOne(t *testing.T) {
}

for _, pod := range pods[:1] {
err := eviction.Evict(pod, test.FakeEventRecorder())
err := eviction.Evict(pod, getBasicVpa(), test.FakeEventRecorder())
assert.Nil(t, err, "Should evict with no error")
}
for _, pod := range pods[1:] {
err := eviction.Evict(pod, test.FakeEventRecorder())
err := eviction.Evict(pod, getBasicVpa(), test.FakeEventRecorder())
assert.Error(t, err, "Error expected")
}
}

func TestEvictEmitEvent(t *testing.T) {
rc := apiv1.ReplicationController{
ObjectMeta: metav1.ObjectMeta{
Name: "rc",
Namespace: "default",
},
TypeMeta: metav1.TypeMeta{
Kind: "ReplicationController",
},
}

index := 0
generatePod := func() test.PodBuilder {
index++
return test.Pod().WithName(fmt.Sprintf("test-%v", index)).WithCreator(&rc.ObjectMeta, &rc.TypeMeta)
}

testCases := []struct {
name string
replicas int32
evictionTolerance float64
vpa *vpa_types.VerticalPodAutoscaler
pods []podWithExpectations
}{
{
name: "Pods that can be evicted",
replicas: 4,
evictionTolerance: 0.5,
vpa: getBasicVpa(),
pods: []podWithExpectations{
{
pod: generatePod().WithPhase(apiv1.PodPending).Get(),
canEvict: true,
evictionSuccess: true,
},
{
pod: generatePod().WithPhase(apiv1.PodPending).Get(),
canEvict: true,
evictionSuccess: true,
},
},
},
{
name: "Pod that can not be evicted",
replicas: 4,
evictionTolerance: 0.5,
vpa: getBasicVpa(),
pods: []podWithExpectations{

{
pod: generatePod().Get(),
canEvict: false,
evictionSuccess: false,
},
},
},
}

for _, testCase := range testCases {
rc.Spec = apiv1.ReplicationControllerSpec{
Replicas: &testCase.replicas,
}
pods := make([]*apiv1.Pod, 0, len(testCase.pods))
for _, p := range testCase.pods {
pods = append(pods, p.pod)
}
factory, _ := getEvictionRestrictionFactory(&rc, nil, nil, nil, 2, testCase.evictionTolerance)
eviction := factory.NewPodsEvictionRestriction(pods, testCase.vpa)

for _, p := range testCase.pods {
mockRecorder := test.MockEventRecorder()
mockRecorder.On("Event", mock.Anything, apiv1.EventTypeNormal, "EvictedByVPA", mock.Anything).Return()
mockRecorder.On("Event", mock.Anything, apiv1.EventTypeNormal, "EvictedPod", mock.Anything).Return()

eviction.Evict(p.pod, testCase.vpa, mockRecorder)

if p.canEvict {
mockRecorder.AssertNumberOfCalls(t, "Event", 2)

} else {
mockRecorder.AssertNumberOfCalls(t, "Event", 0)
}
}
}
}

func getEvictionRestrictionFactory(rc *apiv1.ReplicationController, rs *appsv1.ReplicaSet,
ss *appsv1.StatefulSet, ds *appsv1.DaemonSet, minReplicas int,
evictionToleranceFraction float64) (PodsEvictionRestrictionFactory, error) {
Expand Down
14 changes: 11 additions & 3 deletions vertical-pod-autoscaler/pkg/updater/logic/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ import (
"k8s.io/apimachinery/pkg/labels"
kube_client "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"

corescheme "k8s.io/client-go/kubernetes/scheme"
clientv1 "k8s.io/client-go/kubernetes/typed/core/v1"
v1lister "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
Expand All @@ -38,6 +39,7 @@ import (

vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1"
vpa_clientset "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned/scheme"
vpa_lister "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/listers/autoscaling.k8s.io/v1"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/target"
controllerfetcher "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/target/controller_fetcher"
Expand Down Expand Up @@ -226,7 +228,7 @@ func (u *updater) RunOnce(ctx context.Context) {
return
}
klog.V(2).Infof("evicting pod %s", klog.KObj(pod))
evictErr := evictionLimiter.Evict(pod, u.eventRecorder)
evictErr := evictionLimiter.Evict(pod, vpa, u.eventRecorder)
if evictErr != nil {
klog.Warningf("evicting pod %s failed: %v", klog.KObj(pod), evictErr)
} else {
Expand Down Expand Up @@ -311,6 +313,12 @@ func newEventRecorder(kubeClient kube_client.Interface) record.EventRecorder {
eventBroadcaster.StartLogging(klog.V(4).Infof)
if _, isFake := kubeClient.(*fake.Clientset); !isFake {
eventBroadcaster.StartRecordingToSink(&clientv1.EventSinkImpl{Interface: clientv1.New(kubeClient.CoreV1().RESTClient()).Events("")})
} else {
eventBroadcaster.StartRecordingToSink(&clientv1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
}
return eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{Component: "vpa-updater"})

vpascheme := scheme.Scheme
corescheme.AddToScheme(vpascheme)

return eventBroadcaster.NewRecorder(vpascheme, apiv1.EventSource{Component: "vpa-updater"})
}
49 changes: 49 additions & 0 deletions vertical-pod-autoscaler/pkg/updater/logic/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"

vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1"
controllerfetcher "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/target/controller_fetcher"
Expand Down Expand Up @@ -357,3 +359,50 @@ func TestRunOnceIgnoreNamespaceMatching(t *testing.T) {
updater.RunOnce(context.Background())
eviction.AssertNumberOfCalls(t, "Evict", 0)
}

func TestNewEventRecorderLooping(t *testing.T) {
fakeClient := fake.NewSimpleClientset()
er := newEventRecorder(fakeClient)

testCases := []struct {
reason string
object runtime.Object
message string
}{
{
reason: "EvictedPod",
object: &apiv1.Pod{},
message: "Evicted pod",
},
{
reason: "EvictedPod",
object: &vpa_types.VerticalPodAutoscaler{},
message: "Evicted pod",
},
}
for _, tc := range testCases {
t.Run(tc.reason, func(t *testing.T) {
er.Event(tc.object, apiv1.EventTypeNormal, tc.reason, tc.message)

var events *apiv1.EventList
var err error
// Add delay for fake client to catch up due to be being asynchronous
for i := 0; i < 5; i++ {
events, err = fakeClient.CoreV1().Events("default").List(context.TODO(), metav1.ListOptions{})
if err == nil && len(events.Items) > 0 {
break
}
time.Sleep(100 * time.Millisecond)
}

assert.NoError(t, err, "should be able to list events")
assert.Equal(t, 1, len(events.Items), "should have exactly 1 event")

event := events.Items[0]
assert.Equal(t, tc.reason, event.Reason)
assert.Equal(t, tc.message, event.Message)
assert.Equal(t, apiv1.EventTypeNormal, event.Type)
assert.Equal(t, "vpa-updater", event.Source.Component)
})
}
}
31 changes: 29 additions & 2 deletions vertical-pod-autoscaler/pkg/utils/test/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/listers/core/v1"
v1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/record"

vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1"
Expand Down Expand Up @@ -110,7 +110,7 @@ type PodsEvictionRestrictionMock struct {
}

// Evict is a mock implementation of PodsEvictionRestriction.Evict
func (m *PodsEvictionRestrictionMock) Evict(pod *apiv1.Pod, eventRecorder record.EventRecorder) error {
func (m *PodsEvictionRestrictionMock) Evict(pod *apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler, eventRecorder record.EventRecorder) error {
args := m.Called(pod, eventRecorder)
return args.Error(0)
}
Expand Down Expand Up @@ -268,3 +268,30 @@ func (f *fakeEventRecorder) AnnotatedEventf(object runtime.Object, annotations m
func FakeEventRecorder() record.EventRecorder {
return &fakeEventRecorder{}
}

// mockedEventRecorder is a dummy implementation of record.EventRecorder.
type mockedventRecorder struct {
mock.Mock
}

// Event is a dummy implementation of record.EventRecorder interface.
func (m *mockedventRecorder) Event(object runtime.Object, eventtype, reason, message string) {
m.Called(object, eventtype, reason, message)
}

// Eventf is a dummy implementation of record.EventRecorder interface.
func (m *mockedventRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
}

// PastEventf is a dummy implementation of record.EventRecorder interface.
func (m *mockedventRecorder) PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{}) {
}

// AnnotatedEventf is a dummy implementation of record.EventRecorder interface.
func (m *mockedventRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
}

// MockEventRecorder returns a dummy implementation of record.EventRecorder.
func MockEventRecorder() *mockedventRecorder {
return &mockedventRecorder{}
}

0 comments on commit 83759d4

Please sign in to comment.