-
Notifications
You must be signed in to change notification settings - Fork 180
adding pre-request plugin to requestcontrol layer #1004
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ import ( | |
"context" | ||
"fmt" | ||
"math/rand" | ||
"net" | ||
"strconv" | ||
"time" | ||
|
||
|
@@ -54,6 +55,7 @@ func NewDirectorWithConfig(datastore datastore.Datastore, scheduler Scheduler, s | |
datastore: datastore, | ||
scheduler: scheduler, | ||
saturationDetector: saturationDetector, | ||
preRequestPlugins: config.preRequestPlugins, | ||
postResponsePlugins: config.postResponsePlugins, | ||
} | ||
} | ||
|
@@ -63,14 +65,15 @@ type Director struct { | |
datastore datastore.Datastore | ||
scheduler Scheduler | ||
saturationDetector SaturationDetector | ||
preRequestPlugins []PreRequest | ||
postResponsePlugins []PostResponse | ||
} | ||
|
||
// HandleRequest orchestrates the request lifecycle: | ||
// 1. Parses request details. | ||
// 2. Calls PreDispatch for admission control. | ||
// 3. Calls Dispatch (which calls Scheduler) if request is approved. | ||
// 4. Calls PostDispatch to populate RequestContext with results. | ||
// 2. Calls admitRequest for admission control. | ||
// 3. Calls Scheduler.Schedule if request is approved. | ||
// 4. Calls prepareRequest to populate RequestContext with results and call PreRequest plugins. | ||
// | ||
// It always returns the requestContext even in the error case, as the request context is used in error handling. | ||
func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) { | ||
|
@@ -117,42 +120,39 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo | |
Prompt: prompt, | ||
Headers: reqCtx.Request.Headers, | ||
} | ||
logger = logger.WithValues( | ||
"model", reqCtx.Model, | ||
"resolvedTargetModel", reqCtx.ResolvedTargetModel, | ||
"criticality", requestCriticality, | ||
) | ||
|
||
logger = logger.WithValues("model", reqCtx.Model, "resolvedTargetModel", reqCtx.ResolvedTargetModel, "criticality", requestCriticality) | ||
ctx = log.IntoContext(ctx, logger) | ||
logger.V(logutil.DEBUG).Info("LLM request assembled") | ||
|
||
// --- 2. Saturation Check --- | ||
preDispatchErr := d.PreDispatch(ctx, reqCtx, requestCriticality) | ||
if preDispatchErr != nil { | ||
return reqCtx, preDispatchErr | ||
// --- 2. Admission Control check -- | ||
if err := d.admitRequest(ctx, requestCriticality); err != nil { | ||
return reqCtx, err | ||
} | ||
|
||
// --- 3. Dispatch (Calls Scheduler) --- | ||
results, dispatchErr := d.Dispatch(ctx, reqCtx.SchedulingRequest) | ||
if dispatchErr != nil { | ||
return reqCtx, dispatchErr | ||
// --- 3. Call Scheduler --- | ||
results, err := d.scheduler.Schedule(ctx, reqCtx.SchedulingRequest) | ||
if err != nil { | ||
return reqCtx, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Errorf("failed to find target pod: %w", err).Error()} | ||
} | ||
|
||
// --- 4. PostDispatch (Populates RequestContext) --- | ||
// Insert target endpoint to instruct Envoy to route requests to the specified target pod. | ||
// Attach the port number. | ||
reqCtx, postDispatchErr := d.PostDispatch(ctx, reqCtx, results) | ||
if postDispatchErr != nil { | ||
return reqCtx, postDispatchErr | ||
// --- 4. Prepare Request (Populates RequestContext and call PreRequest plugins) --- | ||
// Insert target endpoint to instruct Envoy to route requests to the specified target pod and attach the port number. | ||
// Invoke PreRequest registered plugins. | ||
reqCtx, err = d.prepareRequest(ctx, reqCtx, results) | ||
if err != nil { | ||
return reqCtx, err | ||
} | ||
|
||
return reqCtx, nil | ||
} | ||
|
||
// PreDispatch handles admission control before dispatch. | ||
func (d *Director) PreDispatch(ctx context.Context, reqCtx *handlers.RequestContext, reqCriticality v1alpha2.Criticality) error { | ||
// admitRequest handles admission control to decide whether or not to accept the request | ||
// based on the request criticality and system saturation state. | ||
func (d *Director) admitRequest(ctx context.Context, requestCriticality v1alpha2.Criticality) error { | ||
logger := log.FromContext(ctx) | ||
|
||
if reqCriticality == v1alpha2.Critical { | ||
if requestCriticality == v1alpha2.Critical { | ||
logger.V(logutil.DEBUG).Info("Critical request bypassing saturation check.") | ||
return nil | ||
} | ||
|
@@ -164,24 +164,14 @@ func (d *Director) PreDispatch(ctx context.Context, reqCtx *handlers.RequestCont | |
Msg: "system saturated, non-critical request dropped", | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
// Dispatch runs one or many scheduling cycles. | ||
func (d *Director) Dispatch(ctx context.Context, llmReq *schedulingtypes.LLMRequest) (*schedulingtypes.SchedulingResult, error) { | ||
var err error | ||
res, err := d.scheduler.Schedule(ctx, llmReq) | ||
if err != nil { | ||
return nil, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Errorf("failed to find target pod: %w", err).Error()} | ||
} | ||
|
||
return res, nil // TODO handle multi cycle result after defining the PostDispatch extension point | ||
return nil | ||
} | ||
|
||
// PostDispatch populates the RequestContext based on scheduling results. | ||
func (d *Director) PostDispatch(ctx context.Context, reqCtx *handlers.RequestContext, result *schedulingtypes.SchedulingResult) (*handlers.RequestContext, error) { | ||
// prepareRequest populates the RequestContext and calls the registered PreRequest plugins | ||
// for allowing plugging customized logic based on the scheduling results. | ||
func (d *Director) prepareRequest(ctx context.Context, reqCtx *handlers.RequestContext, result *schedulingtypes.SchedulingResult) (*handlers.RequestContext, error) { | ||
logger := log.FromContext(ctx) | ||
// currently only get a single result. Will refactor to pluggably implement the PostSchedule | ||
if result == nil || len(result.ProfileResults) == 0 { | ||
return reqCtx, errutil.Error{Code: errutil.Internal, Msg: "results must be greater than zero"} | ||
} | ||
|
@@ -192,13 +182,16 @@ func (d *Director) PostDispatch(ctx context.Context, reqCtx *handlers.RequestCon | |
if err != nil { | ||
return reqCtx, err | ||
} | ||
targetPort := int(pool.Spec.TargetPortNumber) | ||
|
||
endpoint := targetPod.Address + ":" + strconv.Itoa(int(pool.Spec.TargetPortNumber)) | ||
endpoint := net.JoinHostPort(targetPod.Address, strconv.Itoa(targetPort)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
logger.V(logutil.DEFAULT).Info("Request handled", "model", reqCtx.Model, "targetModel", reqCtx.ResolvedTargetModel, "endpoint", targetPod) | ||
|
||
reqCtx.TargetPod = targetPod | ||
reqCtx.TargetEndpoint = endpoint | ||
|
||
d.runPreRequestPlugins(ctx, reqCtx.SchedulingRequest, result, targetPort) | ||
|
||
return reqCtx, nil | ||
} | ||
|
||
|
@@ -254,6 +247,16 @@ func RandomWeightedDraw(logger logr.Logger, model *v1alpha2.InferenceModel, seed | |
return "" | ||
} | ||
|
||
func (d *Director) runPreRequestPlugins(ctx context.Context, request *schedulingtypes.LLMRequest, schedulingResult *schedulingtypes.SchedulingResult, | ||
targetPort int) { | ||
for _, plugin := range d.preRequestPlugins { | ||
log.FromContext(ctx).V(logutil.DEBUG).Info("Running pre-request plugin", "plugin", plugin.Name()) | ||
before := time.Now() | ||
plugin.PreRequest(ctx, request, schedulingResult, targetPort) | ||
metrics.RecordRequestControlPluginProcessingLatency(PreRequestPluginType, plugin.Name(), time.Since(before)) | ||
} | ||
} | ||
|
||
func (d *Director) runPostResponsePlugins(ctx context.Context, request *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) { | ||
for _, plugin := range d.postResponsePlugins { | ||
log.FromContext(ctx).V(logutil.DEBUG).Info("Running post-response plugin", "plugin", plugin.Name()) | ||
|
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -25,9 +25,17 @@ import ( | |||
) | ||||
|
||||
const ( | ||||
PreRequestPluginType = "PreRequest" | ||||
PostResponsePluginType = "PostResponse" | ||||
) | ||||
|
||||
// PreRequest is called by the director after a getting result from scheduling layer and | ||||
// before a request is sent to the selected model server. | ||||
type PreRequest interface { | ||||
plugins.Plugin | ||||
PreRequest(ctx context.Context, request *types.LLMRequest, schedulingResult *types.SchedulingResult, targetPort int) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we pass targetPod as an argument instead of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. with the new scheduler design, there are two different results under the scheduling package.
to your question - passing to summarize, we need to keep
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, this makes sense |
||||
} | ||||
|
||||
// PostResponse is called by the director after a successful response was sent. | ||||
// The given pod argument is the pod that served the request. | ||||
type PostResponse interface { | ||||
|
Uh oh!
There was an error while loading. Please reload this page.