Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 43 additions & 40 deletions pkg/epp/requestcontrol/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"fmt"
"math/rand"
"net"
"strconv"
"time"

Expand Down Expand Up @@ -54,6 +55,7 @@ func NewDirectorWithConfig(datastore datastore.Datastore, scheduler Scheduler, s
datastore: datastore,
scheduler: scheduler,
saturationDetector: saturationDetector,
preRequestPlugins: config.preRequestPlugins,
postResponsePlugins: config.postResponsePlugins,
}
}
Expand All @@ -63,14 +65,15 @@ type Director struct {
datastore datastore.Datastore
scheduler Scheduler
saturationDetector SaturationDetector
preRequestPlugins []PreRequest
postResponsePlugins []PostResponse
}

// HandleRequest orchestrates the request lifecycle:
// 1. Parses request details.
// 2. Calls PreDispatch for admission control.
// 3. Calls Dispatch (which calls Scheduler) if request is approved.
// 4. Calls PostDispatch to populate RequestContext with results.
// 2. Calls admitRequest for admission control.
// 3. Calls Scheduler.Schedule if request is approved.
// 4. Calls prepareRequest to populate RequestContext with results and call PreRequest plugins.
//
// It always returns the requestContext even in the error case, as the request context is used in error handling.
func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) {
Expand Down Expand Up @@ -117,42 +120,39 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
Prompt: prompt,
Headers: reqCtx.Request.Headers,
}
logger = logger.WithValues(
"model", reqCtx.Model,
"resolvedTargetModel", reqCtx.ResolvedTargetModel,
"criticality", requestCriticality,
)

logger = logger.WithValues("model", reqCtx.Model, "resolvedTargetModel", reqCtx.ResolvedTargetModel, "criticality", requestCriticality)
ctx = log.IntoContext(ctx, logger)
logger.V(logutil.DEBUG).Info("LLM request assembled")

// --- 2. Saturation Check ---
preDispatchErr := d.PreDispatch(ctx, reqCtx, requestCriticality)
if preDispatchErr != nil {
return reqCtx, preDispatchErr
// --- 2. Admission Control check --
if err := d.admitRequest(ctx, requestCriticality); err != nil {
return reqCtx, err
}

// --- 3. Dispatch (Calls Scheduler) ---
results, dispatchErr := d.Dispatch(ctx, reqCtx.SchedulingRequest)
if dispatchErr != nil {
return reqCtx, dispatchErr
// --- 3. Call Scheduler ---
results, err := d.scheduler.Schedule(ctx, reqCtx.SchedulingRequest)
if err != nil {
return reqCtx, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Errorf("failed to find target pod: %w", err).Error()}
}

// --- 4. PostDispatch (Populates RequestContext) ---
// Insert target endpoint to instruct Envoy to route requests to the specified target pod.
// Attach the port number.
reqCtx, postDispatchErr := d.PostDispatch(ctx, reqCtx, results)
if postDispatchErr != nil {
return reqCtx, postDispatchErr
// --- 4. Prepare Request (Populates RequestContext and call PreRequest plugins) ---
// Insert target endpoint to instruct Envoy to route requests to the specified target pod and attach the port number.
// Invoke PreRequest registered plugins.
reqCtx, err = d.prepareRequest(ctx, reqCtx, results)
if err != nil {
return reqCtx, err
}

return reqCtx, nil
}

// PreDispatch handles admission control before dispatch.
func (d *Director) PreDispatch(ctx context.Context, reqCtx *handlers.RequestContext, reqCriticality v1alpha2.Criticality) error {
// admitRequest handles admission control to decide whether or not to accept the request
// based on the request criticality and system saturation state.
func (d *Director) admitRequest(ctx context.Context, requestCriticality v1alpha2.Criticality) error {
logger := log.FromContext(ctx)

if reqCriticality == v1alpha2.Critical {
if requestCriticality == v1alpha2.Critical {
logger.V(logutil.DEBUG).Info("Critical request bypassing saturation check.")
return nil
}
Expand All @@ -164,24 +164,14 @@ func (d *Director) PreDispatch(ctx context.Context, reqCtx *handlers.RequestCont
Msg: "system saturated, non-critical request dropped",
}
}
return nil
}

// Dispatch runs one or many scheduling cycles.
func (d *Director) Dispatch(ctx context.Context, llmReq *schedulingtypes.LLMRequest) (*schedulingtypes.SchedulingResult, error) {
var err error
res, err := d.scheduler.Schedule(ctx, llmReq)
if err != nil {
return nil, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Errorf("failed to find target pod: %w", err).Error()}
}

return res, nil // TODO handle multi cycle result after defining the PostDispatch extension point
return nil
}

// PostDispatch populates the RequestContext based on scheduling results.
func (d *Director) PostDispatch(ctx context.Context, reqCtx *handlers.RequestContext, result *schedulingtypes.SchedulingResult) (*handlers.RequestContext, error) {
// prepareRequest populates the RequestContext and calls the registered PreRequest plugins
// for allowing plugging customized logic based on the scheduling results.
func (d *Director) prepareRequest(ctx context.Context, reqCtx *handlers.RequestContext, result *schedulingtypes.SchedulingResult) (*handlers.RequestContext, error) {
logger := log.FromContext(ctx)
// currently only get a single result. Will refactor to pluggably implement the PostSchedule
if result == nil || len(result.ProfileResults) == 0 {
return reqCtx, errutil.Error{Code: errutil.Internal, Msg: "results must be greater than zero"}
}
Expand All @@ -192,13 +182,16 @@ func (d *Director) PostDispatch(ctx context.Context, reqCtx *handlers.RequestCon
if err != nil {
return reqCtx, err
}
targetPort := int(pool.Spec.TargetPortNumber)

endpoint := targetPod.Address + ":" + strconv.Itoa(int(pool.Spec.TargetPortNumber))
endpoint := net.JoinHostPort(targetPod.Address, strconv.Itoa(targetPort))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

logger.V(logutil.DEFAULT).Info("Request handled", "model", reqCtx.Model, "targetModel", reqCtx.ResolvedTargetModel, "endpoint", targetPod)

reqCtx.TargetPod = targetPod
reqCtx.TargetEndpoint = endpoint

d.runPreRequestPlugins(ctx, reqCtx.SchedulingRequest, result, targetPort)

return reqCtx, nil
}

Expand Down Expand Up @@ -254,6 +247,16 @@ func RandomWeightedDraw(logger logr.Logger, model *v1alpha2.InferenceModel, seed
return ""
}

func (d *Director) runPreRequestPlugins(ctx context.Context, request *schedulingtypes.LLMRequest, schedulingResult *schedulingtypes.SchedulingResult,
targetPort int) {
for _, plugin := range d.preRequestPlugins {
log.FromContext(ctx).V(logutil.DEBUG).Info("Running pre-request plugin", "plugin", plugin.Name())
before := time.Now()
plugin.PreRequest(ctx, request, schedulingResult, targetPort)
metrics.RecordRequestControlPluginProcessingLatency(PreRequestPluginType, plugin.Name(), time.Since(before))
}
}

func (d *Director) runPostResponsePlugins(ctx context.Context, request *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) {
for _, plugin := range d.postResponsePlugins {
log.FromContext(ctx).V(logutil.DEBUG).Info("Running post-response plugin", "plugin", plugin.Name())
Expand Down
8 changes: 8 additions & 0 deletions pkg/epp/requestcontrol/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,17 @@ import (
)

const (
PreRequestPluginType = "PreRequest"
PostResponsePluginType = "PostResponse"
)

// PreRequest is called by the director after a getting result from scheduling layer and
// before a request is sent to the selected model server.
type PreRequest interface {
plugins.Plugin
PreRequest(ctx context.Context, request *types.LLMRequest, schedulingResult *types.SchedulingResult, targetPort int)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we pass targetPod as an argument instead of schedulingResult. Given the multi-profile scheduler architecture, it's unclear if schedulingResult is the result of one profile or end result.

Copy link
Contributor Author

@nirrozenbaum nirrozenbaum Jun 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with the new scheduler design, there are two different results under the scheduling package.
we have:

  • ProfileRunResult - which represents a single profile run result.
  • SchedulingResult - which is a map from profile name to it's ProfileRunResult + a field that specifies the primary profile that should be used in the destination header.

to your question - passing SchedulingResult and not the targetPod is intentional. PreRequest extension point is exactly the place where we can make sense of the multi profile results.
For example, in llm-d and PD use case, this is the place where we wire prefill selected endpoint(s) in a dedicated header when returning the decode selection. This is the way we wire P + D selected endpoints.
example can be found here:
https://github.com/llm-d/llm-d-inference-scheduler/blob/0c49737834fc9f2b5213447437ac4815b1d5a98c/pkg/plugins/pre-request/pd_prerequest.go#L33-L37

to summarize, we need to keep SchedulingResult here and not only targetPod. Otherwise, there is no place to make sense of the results of the profiles other than the primary one.
if one wants to get the targetPod, it can be done same as was done here:

targetPod := result.ProfileResults[result.PrimaryProfileName].TargetPod.GetPod()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this makes sense

}

// PostResponse is called by the director after a successful response was sent.
// The given pod argument is the pod that served the request.
type PostResponse interface {
Expand Down
9 changes: 9 additions & 0 deletions pkg/epp/requestcontrol/request_control_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,24 @@ package requestcontrol
// NewConfig creates a new Config object and returns its pointer.
func NewConfig() *Config {
return &Config{
preRequestPlugins: []PreRequest{},
postResponsePlugins: []PostResponse{},
}
}

// Config provides a configuration for the requestcontrol plugins.
type Config struct {
preRequestPlugins []PreRequest
postResponsePlugins []PostResponse
}

// WithPreRequestPlugins sets the given plugins as the PreRequest plugins.
// If the Config has PreRequest plugins already, this call replaces the existing plugins with the given ones.
func (c *Config) WithPreRequestPlugins(plugins ...PreRequest) *Config {
c.preRequestPlugins = plugins
return c
}

// WithPostResponsePlugins sets the given plugins as the PostResponse plugins.
// If the Config has PostResponse plugins already, this call replaces the existing plugins with the given ones.
func (c *Config) WithPostResponsePlugins(plugins ...PostResponse) *Config {
Expand Down