Skip to content

Commit

Permalink
vpa-admission-controller: Wire contexts
Browse files Browse the repository at this point in the history
  • Loading branch information
ialidzhikov committed Jul 2, 2024
1 parent 965c84a commit c29cb95
Show file tree
Hide file tree
Showing 20 changed files with 115 additions and 88 deletions.
19 changes: 11 additions & 8 deletions vertical-pod-autoscaler/pkg/admission-controller/logic/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ limitations under the License.
package logic

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"

"k8s.io/api/admission/v1"
admissionv1 "k8s.io/api/admission/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/admission-controller/resource"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/admission-controller/resource/pod"
Expand Down Expand Up @@ -56,12 +57,12 @@ func (s *AdmissionServer) RegisterResourceHandler(resourceHandler resource.Handl
s.resourceHandlers[resourceHandler.GroupResource()] = resourceHandler
}

func (s *AdmissionServer) admit(data []byte) (*v1.AdmissionResponse, metrics_admission.AdmissionStatus, metrics_admission.AdmissionResource) {
func (s *AdmissionServer) admit(ctx context.Context, data []byte) (*admissionv1.AdmissionResponse, metrics_admission.AdmissionStatus, metrics_admission.AdmissionResource) {
// we don't block the admission by default, even on unparsable JSON
response := v1.AdmissionResponse{}
response := admissionv1.AdmissionResponse{}
response.Allowed = true

ar := v1.AdmissionReview{}
ar := admissionv1.AdmissionReview{}
if err := json.Unmarshal(data, &ar); err != nil {
klog.Error(err)
return &response, metrics_admission.Error, metrics_admission.Unknown
Expand All @@ -80,7 +81,7 @@ func (s *AdmissionServer) admit(data []byte) (*v1.AdmissionResponse, metrics_adm

handler, ok := s.resourceHandlers[admittedGroupResource]
if ok {
patches, err = handler.GetPatches(ar.Request)
patches, err = handler.GetPatches(ctx, ar.Request)
resource = handler.AdmissionResource()

if handler.DisallowIncorrectObjects() && err != nil {
Expand All @@ -106,7 +107,7 @@ func (s *AdmissionServer) admit(data []byte) (*v1.AdmissionResponse, metrics_adm
klog.Errorf("Cannot marshal the patch %v: %v", patches, err)
return &response, metrics_admission.Error, resource
}
patchType := v1.PatchTypeJSONPatch
patchType := admissionv1.PatchTypeJSONPatch
response.PatchType = &patchType
response.Patch = patch
klog.V(4).Infof("Sending patches: %v", patches)
Expand All @@ -127,6 +128,8 @@ func (s *AdmissionServer) admit(data []byte) (*v1.AdmissionResponse, metrics_adm

// Serve is a handler function of AdmissionServer
func (s *AdmissionServer) Serve(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()

executionTimer := metrics_admission.NewExecutionTimer()
defer executionTimer.ObserveTotal()
admissionLatency := metrics_admission.NewAdmissionLatency()
Expand All @@ -146,8 +149,8 @@ func (s *AdmissionServer) Serve(w http.ResponseWriter, r *http.Request) {
}
executionTimer.ObserveStep("read_request")

reviewResponse, status, resource := s.admit(body)
ar := v1.AdmissionReview{
reviewResponse, status, resource := s.admit(ctx, body)
ar := admissionv1.AdmissionReview{
Response: reviewResponse,
TypeMeta: metav1.TypeMeta{
Kind: "AdmissionReview",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package resource

import (
"context"

v1 "k8s.io/api/admission/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/metrics/admission"
Expand All @@ -38,5 +40,5 @@ type Handler interface {
// DisallowIncorrectObjects returns whether incorrect objects (eg. unparsable, not passing validations) should be disallowed by Admission Server.
DisallowIncorrectObjects() bool
// GetPatches returns patches for given AdmissionRequest
GetPatches(*v1.AdmissionRequest) ([]PatchRecord, error)
GetPatches(context.Context, *v1.AdmissionRequest) ([]PatchRecord, error)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ limitations under the License.
package pod

import (
"context"
"encoding/json"
"fmt"

admissionv1 "k8s.io/api/admission/v1"
v1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
resource_admission "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/admission-controller/resource"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/admission-controller/resource/pod/patch"
Expand Down Expand Up @@ -63,12 +64,12 @@ func (h *resourceHandler) DisallowIncorrectObjects() bool {
}

// GetPatches builds patches for Pod in given admission request.
func (h *resourceHandler) GetPatches(ar *admissionv1.AdmissionRequest) ([]resource_admission.PatchRecord, error) {
func (h *resourceHandler) GetPatches(ctx context.Context, ar *admissionv1.AdmissionRequest) ([]resource_admission.PatchRecord, error) {
if ar.Resource.Version != "v1" {
return nil, fmt.Errorf("only v1 Pods are supported")
}
raw, namespace := ar.Object.Raw, ar.Namespace
pod := v1.Pod{}
pod := corev1.Pod{}
if err := json.Unmarshal(raw, &pod); err != nil {
return nil, err
}
Expand All @@ -77,7 +78,7 @@ func (h *resourceHandler) GetPatches(ar *admissionv1.AdmissionRequest) ([]resour
pod.Namespace = namespace
}
klog.V(4).Infof("Admitting pod %s", klog.KObj(&pod))
controllingVpa := h.vpaMatcher.GetMatchingVPA(&pod)
controllingVpa := h.vpaMatcher.GetMatchingVPA(ctx, &pod)
if controllingVpa == nil {
klog.V(4).Infof("No matching VPA found for pod %s", klog.KObj(&pod))
return []resource_admission.PatchRecord{}, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package pod

import (
"context"
"fmt"
"testing"

Expand All @@ -43,7 +44,7 @@ type fakeVpaMatcher struct {
vpa *vpa_types.VerticalPodAutoscaler
}

func (m *fakeVpaMatcher) GetMatchingVPA(_ *apiv1.Pod) *vpa_types.VerticalPodAutoscaler {
func (m *fakeVpaMatcher) GetMatchingVPA(_ context.Context, _ *apiv1.Pod) *vpa_types.VerticalPodAutoscaler {
return m.vpa
}

Expand Down Expand Up @@ -176,7 +177,7 @@ func TestGetPatches(t *testing.T) {
fppp := &fakePodPreProcessor{tc.podPreProcessorError}
fvm := &fakeVpaMatcher{vpa: tc.vpa}
h := NewResourceHandler(fppp, fvm, tc.calculators)
patches, err := h.GetPatches(&admissionv1.AdmissionRequest{
patches, err := h.GetPatches(context.Background(), &admissionv1.AdmissionRequest{
Resource: v1.GroupVersionResource{
Version: "v1",
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package vpa

import (
"context"
"encoding/json"
"fmt"

Expand Down Expand Up @@ -71,7 +72,7 @@ func (h *resourceHandler) DisallowIncorrectObjects() bool {
}

// GetPatches builds patches for VPA in given admission request.
func (h *resourceHandler) GetPatches(ar *v1.AdmissionRequest) ([]resource.PatchRecord, error) {
func (h *resourceHandler) GetPatches(_ context.Context, ar *v1.AdmissionRequest) ([]resource.PatchRecord, error) {
raw, isCreate := ar.Object.Raw, ar.Operation == v1.Create
vpa, err := parseVPA(raw)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package vpa

import (
"context"

core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog/v2"
Expand All @@ -31,7 +33,7 @@ import (
// Matcher is capable of returning a single matching VPA object
// for a pod. Will return nil if no matching object is found.
type Matcher interface {
GetMatchingVPA(pod *core.Pod) *vpa_types.VerticalPodAutoscaler
GetMatchingVPA(ctx context.Context, pod *core.Pod) *vpa_types.VerticalPodAutoscaler
}

type matcher struct {
Expand All @@ -49,7 +51,7 @@ func NewMatcher(vpaLister vpa_lister.VerticalPodAutoscalerLister,
controllerFetcher: controllerFetcher}
}

func (m *matcher) GetMatchingVPA(pod *core.Pod) *vpa_types.VerticalPodAutoscaler {
func (m *matcher) GetMatchingVPA(ctx context.Context, pod *core.Pod) *vpa_types.VerticalPodAutoscaler {
configs, err := m.vpaLister.VerticalPodAutoscalers(pod.Namespace).List(labels.Everything())
if err != nil {
klog.Errorf("failed to get vpa configs: %v", err)
Expand All @@ -60,7 +62,7 @@ func (m *matcher) GetMatchingVPA(pod *core.Pod) *vpa_types.VerticalPodAutoscaler
if vpa_api_util.GetUpdateMode(vpaConfig) == vpa_types.UpdateModeOff {
continue
}
selector, err := m.selectorFetcher.Fetch(vpaConfig)
selector, err := m.selectorFetcher.Fetch(ctx, vpaConfig)
if err != nil {
klog.V(3).Infof("skipping VPA object %s because we cannot fetch selector: %s", klog.KObj(vpaConfig), err)
continue
Expand All @@ -71,7 +73,7 @@ func (m *matcher) GetMatchingVPA(pod *core.Pod) *vpa_types.VerticalPodAutoscaler
})
}
klog.V(2).Infof("Let's choose from %d configs for pod %s", len(onConfigs), klog.KObj(pod))
result := vpa_api_util.GetControllingVPAForPod(pod, onConfigs, m.controllerFetcher)
result := vpa_api_util.GetControllingVPAForPod(ctx, pod, onConfigs, m.controllerFetcher)
if result != nil {
return result.Vpa
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package vpa

import (
"context"
"testing"

appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -148,7 +149,7 @@ func TestGetMatchingVpa(t *testing.T) {
// The hierarchy part is being test in the "TestControllerFetcher" test.
matcher := NewMatcher(vpaLister, mockSelectorFetcher, controllerfetcher.FakeControllerFetcher{})

vpa := matcher.GetMatchingVPA(tc.pod)
vpa := matcher.GetMatchingVPA(context.Background(), tc.pod)
if tc.expectedFound && assert.NotNil(t, vpa) {
assert.Equal(t, tc.expectedVpaName, vpa.Name)
} else {
Expand Down
20 changes: 10 additions & 10 deletions vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type ClusterStateFeeder interface {
InitFromCheckpoints()

// LoadVPAs updates clusterState with current state of VPAs.
LoadVPAs()
LoadVPAs(ctx context.Context)

// LoadPods updates clusterState with current specification of Pods and their Containers.
LoadPods()
Expand Down Expand Up @@ -243,7 +243,7 @@ func (feeder *clusterStateFeeder) setVpaCheckpoint(checkpoint *vpa_types.Vertica

func (feeder *clusterStateFeeder) InitFromCheckpoints() {
klog.V(3).Info("Initializing VPA from checkpoints")
feeder.LoadVPAs()
feeder.LoadVPAs(context.TODO())

namespaces := make(map[string]bool)
for _, v := range feeder.clusterState.Vpas {
Expand All @@ -270,7 +270,7 @@ func (feeder *clusterStateFeeder) InitFromCheckpoints() {

func (feeder *clusterStateFeeder) GarbageCollectCheckpoints() {
klog.V(3).Info("Starting garbage collection of checkpoints")
feeder.LoadVPAs()
feeder.LoadVPAs(context.TODO())

namespaceList, err := feeder.coreClient.Namespaces().List(context.TODO(), metav1.ListOptions{})
if err != nil {
Expand Down Expand Up @@ -338,7 +338,7 @@ func filterVPAs(feeder *clusterStateFeeder, allVpaCRDs []*vpa_types.VerticalPodA
}

// LoadVPAs fetches VPA objects and loads them into the cluster state.
func (feeder *clusterStateFeeder) LoadVPAs() {
func (feeder *clusterStateFeeder) LoadVPAs(ctx context.Context) {
// List VPA API objects.
allVpaCRDs, err := feeder.vpaLister.List(labels.Everything())
if err != nil {
Expand All @@ -358,7 +358,7 @@ func (feeder *clusterStateFeeder) LoadVPAs() {
VpaName: vpaCRD.Name,
}

selector, conditions := feeder.getSelector(vpaCRD)
selector, conditions := feeder.getSelector(ctx, vpaCRD)
klog.V(4).Infof("Using selector %s for VPA %s", selector.String(), klog.KObj(vpaCRD))

if feeder.clusterState.AddOrUpdateVpa(vpaCRD, selector) == nil {
Expand Down Expand Up @@ -486,7 +486,7 @@ type condition struct {
message string
}

func (feeder *clusterStateFeeder) validateTargetRef(vpa *vpa_types.VerticalPodAutoscaler) (bool, condition) {
func (feeder *clusterStateFeeder) validateTargetRef(ctx context.Context, vpa *vpa_types.VerticalPodAutoscaler) (bool, condition) {
//
if vpa.Spec.TargetRef == nil {
return false, condition{}
Expand All @@ -499,7 +499,7 @@ func (feeder *clusterStateFeeder) validateTargetRef(vpa *vpa_types.VerticalPodAu
},
ApiVersion: vpa.Spec.TargetRef.APIVersion,
}
top, err := feeder.controllerFetcher.FindTopMostWellKnownOrScalable(&k)
top, err := feeder.controllerFetcher.FindTopMostWellKnownOrScalable(ctx, &k)
if err != nil {
return false, condition{conditionType: vpa_types.ConfigUnsupported, delete: false, message: fmt.Sprintf("Error checking if target is a topmost well-known or scalable controller: %s", err)}
}
Expand All @@ -512,10 +512,10 @@ func (feeder *clusterStateFeeder) validateTargetRef(vpa *vpa_types.VerticalPodAu
return true, condition{}
}

func (feeder *clusterStateFeeder) getSelector(vpa *vpa_types.VerticalPodAutoscaler) (labels.Selector, []condition) {
selector, fetchErr := feeder.selectorFetcher.Fetch(vpa)
func (feeder *clusterStateFeeder) getSelector(ctx context.Context, vpa *vpa_types.VerticalPodAutoscaler) (labels.Selector, []condition) {
selector, fetchErr := feeder.selectorFetcher.Fetch(ctx, vpa)
if selector != nil {
validTargetRef, unsupportedCondition := feeder.validateTargetRef(vpa)
validTargetRef, unsupportedCondition := feeder.validateTargetRef(ctx, vpa)
if !validTargetRef {
return labels.Nothing(), []condition{
unsupportedCondition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package input

import (
"context"
"fmt"
"testing"
"time"
Expand All @@ -32,7 +33,7 @@ import (
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/input/history"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/input/spec"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/model"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/target/controller_fetcher"
controllerfetcher "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/target/controller_fetcher"
target_mock "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/target/mock"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/test"
)
Expand All @@ -42,7 +43,7 @@ type fakeControllerFetcher struct {
err error
}

func (f *fakeControllerFetcher) FindTopMostWellKnownOrScalable(_ *controllerfetcher.ControllerKeyWithAPIVersion) (*controllerfetcher.ControllerKeyWithAPIVersion, error) {
func (f *fakeControllerFetcher) FindTopMostWellKnownOrScalable(_ context.Context, _ *controllerfetcher.ControllerKeyWithAPIVersion) (*controllerfetcher.ControllerKeyWithAPIVersion, error) {
return f.key, f.err
}

Expand Down Expand Up @@ -315,7 +316,7 @@ func TestLoadPods(t *testing.T) {
if tc.expectedVpaFetch {
targetSelectorFetcher.EXPECT().Fetch(vpa).Return(tc.selector, tc.fetchSelectorError)
}
clusterStateFeeder.LoadVPAs()
clusterStateFeeder.LoadVPAs(context.Background())

vpaID := model.VpaID{
Namespace: vpa.Namespace,
Expand Down
Loading

0 comments on commit c29cb95

Please sign in to comment.