Skip to content

Commit

Permalink
feat(admission server): refactor admission webhooks
Browse files Browse the repository at this point in the history
Signed-off-by: Wassim DHIF <wassim.dhif@datadoghq.com>
  • Loading branch information
wdhif committed Aug 16, 2024
1 parent d448715 commit b88e005
Show file tree
Hide file tree
Showing 23 changed files with 469 additions and 516 deletions.
228 changes: 49 additions & 179 deletions cmd/cluster-agent/admission/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

authenticationv1 "k8s.io/api/authentication/v1"

admicommon "github.com/DataDog/datadog-agent/pkg/clusteragent/admission/common"
"github.com/DataDog/datadog-agent/pkg/clusteragent/admission/metrics"
"github.com/DataDog/datadog-agent/pkg/config"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/common"
Expand All @@ -29,7 +30,6 @@ import (

admiv1 "k8s.io/api/admission/v1"
admiv1beta1 "k8s.io/api/admission/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/dynamic"
Expand All @@ -38,8 +38,8 @@ import (

const jsonContentType = "application/json"

// ValidateRequest contains the information of a validation request
type ValidateRequest struct {
// Request contains the information of a validation request
type Request struct {
// Raw is the raw request object
Raw []byte
// Name is the name of the object
Expand All @@ -54,27 +54,8 @@ type ValidateRequest struct {
APIClient kubernetes.Interface
}

// MutateRequest contains the information of a mutation request
type MutateRequest struct {
// Raw is the raw request object
Raw []byte
// Name is the name of the object
Name string
// Namespace is the namespace of the object
Namespace string
// UserInfo contains information about the requesting user
UserInfo *authenticationv1.UserInfo
// DynamicClient holds a dynamic Kubernetes client
DynamicClient dynamic.Interface
// APIClient holds a Kubernetes client
APIClient kubernetes.Interface
}

// ValidatingWebhookFunc is the function that runs the validation webhook logic
type ValidatingWebhookFunc func(request *ValidateRequest) (bool, error)

// MutatingWebhookFunc is the function that runs the mutating webhook logic
type MutatingWebhookFunc func(request *MutateRequest) ([]byte, error)
// WebhookFunc is the generic function type that runs either a validating or mutating webhook logic
type WebhookFunc func(request *Request) *admiv1.AdmissionResponse

// Server TODO <container-integrations>
type Server struct {
Expand Down Expand Up @@ -109,19 +90,11 @@ func (s *Server) initDecoder() {
s.decoder = serializer.NewCodecFactory(scheme).UniversalDeserializer()
}

// RegisterValidatingWebhook adds a Validating admission webhook handler.
// It must be called to register the desired webhook handlers before calling Run.
func (s *Server) RegisterValidatingWebhook(uri string, webhookName string, f ValidatingWebhookFunc, dc dynamic.Interface, apiClient kubernetes.Interface) {
s.mux.HandleFunc(uri, func(w http.ResponseWriter, r *http.Request) {
s.validateHandler(w, r, webhookName, f, dc, apiClient)
})
}

// RegisterMutatingWebhook adds a Mutating admission webhook handler.
// RegisterWebhook adds a Validating admission webhook handler.
// It must be called to register the desired webhook handlers before calling Run.
func (s *Server) RegisterMutatingWebhook(uri string, webhookName string, f MutatingWebhookFunc, dc dynamic.Interface, apiClient kubernetes.Interface) {
func (s *Server) RegisterWebhook(uri string, webhookName string, webhookType admicommon.WebhookType, f WebhookFunc, dc dynamic.Interface, apiClient kubernetes.Interface) {
s.mux.HandleFunc(uri, func(w http.ResponseWriter, r *http.Request) {
s.mutateHandler(w, r, webhookName, f, dc, apiClient)
s.Handle(w, r, webhookName, webhookType, f, dc, apiClient)
})
}

Expand Down Expand Up @@ -161,43 +134,54 @@ func (s *Server) Run(mainCtx context.Context, client kubernetes.Interface) error
return server.Shutdown(shutdownCtx)
}

// validateHandler contains the main logic responsible for handling validation requests.
// Handle contains the main logic responsible for handling admission requests.
// It supports both v1 and v1beta1 requests.
func (s *Server) validateHandler(w http.ResponseWriter, r *http.Request, webhookName string, validateFunc ValidatingWebhookFunc, dc dynamic.Interface, apiClient kubernetes.Interface) {
metrics.ValidatingWebhooksReceived.Inc(webhookName)
func (s *Server) Handle(w http.ResponseWriter, r *http.Request, webhookName string, webhookType admicommon.WebhookType, webhookFunc WebhookFunc, dc dynamic.Interface, apiClient kubernetes.Interface) {
// Increment the metrics for the received webhook
if webhookType == admicommon.ValidatingWebhook {
metrics.ValidatingWebhooksReceived.Inc(webhookName)
} else if webhookType == admicommon.MutatingWebhook {
metrics.MutatingWebhooksReceived.Inc(webhookName)
}

// Measure the time it takes to process the request
start := time.Now()
defer func() {
metrics.ValidatingWebhooksResponseDuration.Observe(time.Since(start).Seconds(), webhookName)
if webhookType == admicommon.ValidatingWebhook {
metrics.ValidatingWebhooksResponseDuration.Observe(time.Since(start).Seconds(), webhookName)
} else if webhookType == admicommon.MutatingWebhook {
metrics.MutatingWebhooksResponseDuration.Observe(time.Since(start).Seconds(), webhookName)
}
}()

// Validate admission request
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
log.Warnf("Invalid method %s, only POST requests are allowed", r.Method)
return
}

body, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
log.Warnf("Could not read request body: %v", err)
return
}
defer r.Body.Close()

if contentType := r.Header.Get("Content-Type"); contentType != jsonContentType {
w.WriteHeader(http.StatusBadRequest)
log.Warnf("Unsupported content type %s, only %s is supported", contentType, jsonContentType)
return
}

// Deserialize admission request
obj, gvk, err := s.decoder.Decode(body, nil, nil)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
log.Warnf("Could not deserialize request: %v", err)
return
}

// Handle the request based on GroupVersionKind
var response runtime.Object
switch *gvk {
case admiv1.SchemeGroupVersion.WithKind("AdmissionReview"):
Expand All @@ -207,108 +191,26 @@ func (s *Server) validateHandler(w http.ResponseWriter, r *http.Request, webhook
}
admissionReviewResp := &admiv1.AdmissionReview{}
admissionReviewResp.SetGroupVersionKind(*gvk)
validateRequest := ValidateRequest{
Raw: admissionReviewReq.Request.Object.Raw,
Name: admissionReviewReq.Request.Name,
Namespace: admissionReviewReq.Request.Namespace,
UserInfo: &admissionReviewReq.Request.UserInfo,
DynamicClient: dc,
APIClient: apiClient,
}
validation, err := validateFunc(&validateRequest)
admissionReviewResp.Response = validationResponse(validation, err)
admissionReviewResp.Response.UID = admissionReviewReq.Request.UID
response = admissionReviewResp
case admiv1beta1.SchemeGroupVersion.WithKind("AdmissionReview"):
admissionReviewReq, ok := obj.(*admiv1beta1.AdmissionReview)
if !ok {
log.Errorf("Expected v1beta1.AdmissionReview, got type %T", obj)
}
admissionReviewResp := &admiv1beta1.AdmissionReview{}
admissionReviewResp.SetGroupVersionKind(*gvk)
mutateRequest := ValidateRequest{
admissionRequest := Request{
Raw: admissionReviewReq.Request.Object.Raw,
Name: admissionReviewReq.Request.Name,
Namespace: admissionReviewReq.Request.Namespace,
UserInfo: &admissionReviewReq.Request.UserInfo,
DynamicClient: dc,
APIClient: apiClient,
}
validation, err := validateFunc(&mutateRequest)
admissionReviewResp.Response = responseV1ToV1beta1(validationResponse(validation, err))
admissionReviewResp.Response.UID = admissionReviewReq.Request.UID
response = admissionReviewResp
default:
log.Errorf("Group version kind %v is not supported", gvk)
w.WriteHeader(http.StatusBadRequest)
return
}

encoder := json.NewEncoder(w)
err = encoder.Encode(&response)
if err != nil {
log.Warnf("Failed to encode the response: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
}

// mutateHandler contains the main logic responsible for handling mutation requests.
// It supports both v1 and v1beta1 requests.
func (s *Server) mutateHandler(w http.ResponseWriter, r *http.Request, webhookName string, mutateFunc MutatingWebhookFunc, dc dynamic.Interface, apiClient kubernetes.Interface) {
metrics.MutatingWebhooksReceived.Inc(webhookName)

start := time.Now()
defer func() {
metrics.MutatingWebhooksResponseDuration.Observe(time.Since(start).Seconds(), webhookName)
}()

if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
log.Warnf("Invalid method %s, only POST requests are allowed", r.Method)
return
}

body, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
log.Warnf("Could not read request body: %v", err)
return
}
defer r.Body.Close()

if contentType := r.Header.Get("Content-Type"); contentType != jsonContentType {
w.WriteHeader(http.StatusBadRequest)
log.Warnf("Unsupported content type %s, only %s is supported", contentType, jsonContentType)
return
}

obj, gvk, err := s.decoder.Decode(body, nil, nil)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
log.Warnf("Could not deserialize request: %v", err)
return
}

var response runtime.Object
switch *gvk {
case admiv1.SchemeGroupVersion.WithKind("AdmissionReview"):
admissionReviewReq, ok := obj.(*admiv1.AdmissionReview)
if !ok {
log.Errorf("Expected v1.AdmissionReview, got type %T", obj)
// Generate admission response
if webhookType == admicommon.ValidatingWebhook {
validation := webhookFunc(&admissionRequest)
admissionReviewResp.Response = validation
} else if webhookType == admicommon.MutatingWebhook {
mutationResponse := webhookFunc(&admissionRequest)
admissionReviewResp.Response = mutationResponse
} else {
log.Errorf("Invalid webhook type %v", webhookType)
w.WriteHeader(http.StatusInternalServerError)
}
admissionReviewResp := &admiv1.AdmissionReview{}
admissionReviewResp.SetGroupVersionKind(*gvk)
mutateRequest := MutateRequest{
Raw: admissionReviewReq.Request.Object.Raw,
Name: admissionReviewReq.Request.Name,
Namespace: admissionReviewReq.Request.Namespace,
UserInfo: &admissionReviewReq.Request.UserInfo,
DynamicClient: dc,
APIClient: apiClient,
}
jsonPatch, err := mutateFunc(&mutateRequest)
admissionReviewResp.Response = mutationResponse(jsonPatch, err)
admissionReviewResp.Response.UID = admissionReviewReq.Request.UID
response = admissionReviewResp
case admiv1beta1.SchemeGroupVersion.WithKind("AdmissionReview"):
Expand All @@ -318,16 +220,26 @@ func (s *Server) mutateHandler(w http.ResponseWriter, r *http.Request, webhookNa
}
admissionReviewResp := &admiv1beta1.AdmissionReview{}
admissionReviewResp.SetGroupVersionKind(*gvk)
mutateRequest := MutateRequest{
admissionRequest := Request{
Raw: admissionReviewReq.Request.Object.Raw,
Name: admissionReviewReq.Request.Name,
Namespace: admissionReviewReq.Request.Namespace,
UserInfo: &admissionReviewReq.Request.UserInfo,
DynamicClient: dc,
APIClient: apiClient,
}
jsonPatch, err := mutateFunc(&mutateRequest)
admissionReviewResp.Response = responseV1ToV1beta1(mutationResponse(jsonPatch, err))

// Generate admission response
if webhookType == admicommon.ValidatingWebhook {
validation := webhookFunc(&admissionRequest)
admissionReviewResp.Response = responseV1ToV1beta1(validation)
} else if webhookType == admicommon.MutatingWebhook {
mutationResponse := webhookFunc(&admissionRequest)
admissionReviewResp.Response = responseV1ToV1beta1(mutationResponse)
} else {
log.Errorf("Invalid webhook type %v", webhookType)
w.WriteHeader(http.StatusInternalServerError)
}
admissionReviewResp.Response.UID = admissionReviewReq.Request.UID
response = admissionReviewResp
default:
Expand All @@ -345,48 +257,6 @@ func (s *Server) mutateHandler(w http.ResponseWriter, r *http.Request, webhookNa
}
}

// validationResponse returns the adequate v1.AdmissionResponse based on the mutation result.
func validationResponse(validation bool, err error) *admiv1.AdmissionResponse {
if err != nil {
log.Warnf("Failed to validate: %v", err)

return &admiv1.AdmissionResponse{
Result: &metav1.Status{
Message: err.Error(),
},
Allowed: false, //TODO We should have a DefaultValidation variable with false. Modify all occurences.
}

}

return &admiv1.AdmissionResponse{
Allowed: validation,
}
}

// mutationResponse returns the adequate v1.AdmissionResponse based on the mutation result.
func mutationResponse(jsonPatch []byte, err error) *admiv1.AdmissionResponse {
if err != nil {
log.Warnf("Failed to mutate: %v", err)

return &admiv1.AdmissionResponse{
Result: &metav1.Status{
Message: err.Error(),
},
Allowed: true,
}

}

patchType := admiv1.PatchTypeJSONPatch

return &admiv1.AdmissionResponse{
Patch: jsonPatch,
PatchType: &patchType,
Allowed: true,
}
}

// responseV1ToV1beta1 converts a v1.AdmissionResponse into a v1beta1.AdmissionResponse.
func responseV1ToV1beta1(resp *admiv1.AdmissionResponse) *admiv1beta1.AdmissionResponse {
var patchType *admiv1beta1.PatchType
Expand Down
12 changes: 4 additions & 8 deletions cmd/cluster-agent/subcommands/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,20 +476,16 @@ func start(log log.Component,
StopCh: stopCh,
}

validatingWebhooks, mutatingWebhooks, err := admissionpkg.StartControllers(admissionCtx, wmeta, pa)
webhooks, err := admissionpkg.StartControllers(admissionCtx, wmeta, pa)
if err != nil {
pkglog.Errorf("Could not start admission controller: %v", err)
} else {
// Webhook and secret controllers are started successfully
// Setup the k8s admission webhook server
// Set up the k8s admission webhook server
server := admissioncmd.NewServer()

for _, webhookConf := range validatingWebhooks {
server.RegisterValidatingWebhook(webhookConf.Endpoint(), webhookConf.Name(), webhookConf.ValidateFunc(), apiCl.DynamicCl, apiCl.Cl)
}

for _, webhookConf := range mutatingWebhooks {
server.RegisterMutatingWebhook(webhookConf.Endpoint(), webhookConf.Name(), webhookConf.MutateFunc(), apiCl.DynamicCl, apiCl.Cl)
for _, webhookConf := range webhooks {
server.RegisterWebhook(webhookConf.Endpoint(), webhookConf.Name(), webhookConf.WebhookType(), webhookConf.WebhookFunc(), apiCl.DynamicCl, apiCl.Cl)
}

// Start the k8s admission webhook server
Expand Down
10 changes: 9 additions & 1 deletion pkg/clusteragent/admission/common/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,24 @@
// Package common defines constants and types used by the Admission Controller.
package common

// WebhookType is the type of the webhook.
type WebhookType int

const (
// EnabledLabelKey pod label to disable/enable mutations at the pod level.
EnabledLabelKey = "admission.datadoghq.com/enabled"

// InjectionModeLabelKey pod label to chose the config injection at the pod level.
// InjectionModeLabelKey pod label to choose the config injection at the pod level.
InjectionModeLabelKey = "admission.datadoghq.com/config.mode"

// LibVersionAnnotKeyFormat is the format of the library version annotation
LibVersionAnnotKeyFormat = "admission.datadoghq.com/%s-lib.version"

// LibConfigV1AnnotKeyFormat is the format of the library config annotation
LibConfigV1AnnotKeyFormat = "admission.datadoghq.com/%s-lib.config.v1"

// ValidatingWebhook is type for ValidatingWebhook.
ValidatingWebhook WebhookType = iota
// MutatingWebhook is type for MutatingWebhook.
MutatingWebhook WebhookType = iota
)
Loading

0 comments on commit b88e005

Please sign in to comment.