Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 0 additions & 33 deletions pkg/epp/flowcontrol/types/README.md

This file was deleted.

37 changes: 33 additions & 4 deletions pkg/epp/flowcontrol/types/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,38 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

// Package types defines the core data structures and service contracts for the Flow Control system.
// Package types defines the fundamental data structures, interfaces, and errors that form the vocabulary of the Flow
// Control system. It establishes the core data contracts for the request lifecycle, from initial submission to final,
// reportable outcome.
//
// It establishes the "vocabulary" of the system, defining the objects that are passed between the main controller,
// policies, and queue plugins. The central data model revolves around the lifecycle of a request, which is
// progressively wrapped in interfaces that provide an enriched, read-only view of its state.
// # The Request Lifecycle
//
// The primary entry point to the `controller.FlowController` is the synchronous `EnqueueAndWait` method. The types in
// this package are designed to model a request's journey through this blocking call.
//
// 1. A client first constructs an object that implements the `FlowControlRequest` interface. This is the "raw" input,
// containing the essential data for the request, such as its `FlowID` and `ByteSize`. This object is passed to
// `EnqueueAndWait`.
//
// 2. Internally, the `controller.FlowController` wraps the `FlowControlRequest` in an object that implements the
// `QueueItemAccessor` interface. This is an enriched, read-only view used by policies and queues. It adds internal
// metadata like `EnqueueTime` and `EffectiveTTL`.
//
// 3. If the request is accepted and added to a `framework.SafeQueue`, the queue creates a `QueueItemHandle`. This is
// an opaque, queue-specific handle that the controller uses to perform targeted operations (like removal) without
// needing to know the queue's internal implementation details.
//
// 4. The `EnqueueAndWait` method blocks until the request reaches a terminal state. This final state is reported using
// a `QueueOutcome` enum and a corresponding `error`.
//
// # Final State Reporting: Outcomes and Errors
//
// This combination of a concise enum and a detailed error provides a clear, machine-inspectable result.
//
// - `QueueOutcome`: A low-cardinality enum summarizing the final result (e.g., `QueueOutcomeDispatched`,
// `QueueOutcomeRejectedCapacity`). This is ideal for metrics.
//
// - `error`: For any non-dispatch outcome, a specific sentinel error is returned. These are nested to provide rich
// context. Callers can use `errors.Is()` to check for the general class of failure (`ErrRejected` or `ErrEvicted`),
// and then unwrap the error to find the specific cause (e.g., `ErrQueueAtCapacity` or `ErrTTLExpired`).
package types
42 changes: 22 additions & 20 deletions pkg/epp/flowcontrol/types/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,39 +20,44 @@ import (
"errors"
)

// --- High Level Queue Outcome Errors ---
// --- High-Level Outcome Errors ---

var (
// ErrRejected is a sentinel error indicating a request was rejected by the Flow Controller *before* being formally
// enqueued. Errors returned by `FlowController.EnqueueAndWait()` that signify pre-queue rejection will wrap this
// error.
// ErrRejected is a sentinel error indicating a request was rejected by the `controller.FlowController` *before* being
// formally enqueued. Errors returned by `FlowController.EnqueueAndWait()` that signify pre-queue rejection will wrap
// this error.
//
// Callers should use `errors.Is(err, ErrRejected)` to check for this general class of failure.
ErrRejected = errors.New("request rejected pre-queue")

// ErrEvicted is a sentinel error indicating a request was removed from a queue *after* being successfully enqueued,
// but for reasons other than successful dispatch (e.g., TTL expiry, displacement).
// Errors returned by `FlowController.EnqueueAndWait()` that signify post-queue eviction will wrap this error.
// but for reasons other than successful dispatch (e.g., TTL expiry, displacement). Errors returned by
// `FlowController.EnqueueAndWait()` that signify post-queue eviction will wrap this error.
//
// Callers should use `errors.Is(err, ErrEvicted)` to check for this general class of failure.
ErrEvicted = errors.New("request evicted from queue")
)

// --- Pre-Enqueue Rejection Errors ---
// Errors that can occur before a request is formally added to a `framework.SafeQueue`.
// When returned by `FlowController.EnqueueAndWait()`, these specific errors will typically be wrapped by `ErrRejected`.

// The following errors can occur before a request is formally added to a `framework.SafeQueue`. When returned by
// `FlowController.EnqueueAndWait()`, these specific errors will typically be wrapped by `ErrRejected`.
var (
// ErrNilRequest indicates that a nil `types.FlowControlRequest` was provided.
ErrNilRequest = errors.New("FlowControlRequest cannot be nil")

// ErrFlowIDEmpty indicates that a flow ID was empty when one was required.
ErrFlowIDEmpty = errors.New("flow ID cannot be empty")

// ErrQueueAtCapacity indicates that a request could not be enqueued because queue capacity limits were met and
// displacement (if applicable) failed to make space.
// ErrQueueAtCapacity indicates that a request could not be enqueued because queue capacity limits were met.
ErrQueueAtCapacity = errors.New("queue at capacity and displacement failed to make space")
)

// --- Post-Enqueue Eviction Errors ---
// Errors that occur when a request, already in a `framework.SafeQueue`, is removed for reasons other than dispatch.
// When returned by `FlowController.EnqueueAndWait()`, these specific errors will typically be wrapped by `ErrEvicted`.

// The following errors occur when a request, already in a `framework.SafeQueue`, is removed for reasons other than
// dispatch. When returned by `FlowController.EnqueueAndWait()`, these specific errors will typically be wrapped by
// `ErrEvicted`.
var (
// ErrTTLExpired indicates a request was evicted from a queue because its effective Time-To-Live expired.
ErrTTLExpired = errors.New("request TTL expired")
Expand All @@ -61,17 +66,14 @@ var (
// `FlowControlRequest.Context()`) was cancelled. This error typically wraps the underlying `context.Canceled` or
// `context.DeadlineExceeded` error.
ErrContextCancelled = errors.New("request context cancelled")

// ErrDisplaced indicates a request was evicted from a queue because it was chosen as a victim by a displacement
// policy to make space for another request.
ErrDisplaced = errors.New("request displaced")
)

// --- General FlowController Errors ---
// General runtime errors for the Flow Controller.
// --- General `controller.FlowController` Errors ---

var (
// ErrFlowControllerShutdown indicates that an operation could not complete or an item was evicted because the Flow
// Controller is shutting down or has stopped.
// ErrFlowControllerShutdown indicates that an operation could not complete or an item was evicted because the
// `controller.FlowController` is shutting down or has stopped.
//
// When returned by `FlowController.EnqueueAndWait()`, this will be wrapped by `ErrRejected` (if rejection happens
// before internal queuing) or `ErrEvicted` (if eviction happens after internal queuing).
ErrFlowControllerShutdown = errors.New("FlowController is shutting down")
Expand Down
14 changes: 5 additions & 9 deletions pkg/epp/flowcontrol/types/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,13 @@ limitations under the License.

package types

// FlowSpecification defines the configuration of a logical flow, encapsulating its identity and registered priority.
//
// It acts as the registration key for a flow within the `contracts.FlowRegistry`.
// FlowSpecification defines the complete configuration for a single logical flow.
// It is the data contract used by the `contracts.FlowRegistry` to create and manage the lifecycle of queues and
// policies.
type FlowSpecification struct {
// ID returns the unique name or identifier for this logical flow, corresponding to the value from
// `FlowControlRequest.FlowID()`.
// ID is the unique identifier for this flow (e.g., model name, tenant ID).
ID string

// Priority returns the numerical priority level currently associated with this flow within the
// `contracts.FlowRegistry`.
//
// Convention: Lower numerical values indicate higher priority.
// Priority is the numerical priority level for this flow. Lower values indicate higher priority.
Priority uint
}
25 changes: 19 additions & 6 deletions pkg/epp/flowcontrol/types/mocks/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,19 @@ type MockFlowControlRequest struct {
IDV string
}

// NewMockFlowControlRequest creates a new `MockFlowControlRequest` instance.
func NewMockFlowControlRequest(byteSize uint64, id, flowID string, ctx context.Context) *MockFlowControlRequest {
if ctx == nil {
ctx = context.Background()
}
return &MockFlowControlRequest{
ByteSizeV: byteSize,
IDV: id,
FlowIDV: flowID,
Ctx: ctx,
}
}

func (m *MockFlowControlRequest) Context() context.Context { return m.Ctx }
func (m *MockFlowControlRequest) FlowID() string { return m.FlowIDV }
func (m *MockFlowControlRequest) ByteSize() uint64 { return m.ByteSizeV }
Expand Down Expand Up @@ -82,12 +95,12 @@ var _ types.QueueItemAccessor = &MockQueueItemAccessor{}
func NewMockQueueItemAccessor(byteSize uint64, reqID, flowID string) *MockQueueItemAccessor {
return &MockQueueItemAccessor{
EnqueueTimeV: time.Now(),
OriginalRequestV: &MockFlowControlRequest{
IDV: reqID,
FlowIDV: flowID,
ByteSizeV: byteSize,
Ctx: context.Background(),
},
OriginalRequestV: NewMockFlowControlRequest(
byteSize,
reqID,
flowID,
context.Background(),
),
HandleV: &MockQueueItemHandle{},
}
}
30 changes: 14 additions & 16 deletions pkg/epp/flowcontrol/types/outcomes.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,33 @@ package types

import "strconv"

// QueueOutcome represents the high-level final state of a request's lifecycle within the Flow Controller.
// QueueOutcome represents the high-level final state of a request's lifecycle within the `controller.FlowController`.
//
// It is returned by `FlowController.EnqueueAndWait()` along with a corresponding error. This enum is designed to be a
// low-cardinality label ideal for metrics, while the error provides fine-grained details for non-dispatched outcomes.
type QueueOutcome int

const (
// QueueOutcomeDispatched indicates the request was successfully processed by the Flow Controller and unblocked for
// the caller to proceed.
// QueueOutcomeNotYetFinalized indicates the request has not yet been finalized by the `controller.FlowController`.
// This is an internal default value and should never be returned by `FlowController.EnqueueAndWait()`.
QueueOutcomeNotYetFinalized QueueOutcome = iota

// QueueOutcomeDispatched indicates the request was successfully processed by the `controller.FlowController` and
// unblocked for the caller to proceed.
// The associated error from `FlowController.EnqueueAndWait()` will be nil.
QueueOutcomeDispatched QueueOutcome = iota
QueueOutcomeDispatched

// --- Pre-Enqueue Rejection Outcomes (request never entered a `framework.SafeQueue`) ---
// For these outcomes, the error from `FlowController.EnqueueAndWait()` will wrap `ErrRejected`.

// QueueOutcomeRejectedCapacity indicates rejection because queue capacity limits were met and displacement (if
// applicable) failed to make space.
// QueueOutcomeRejectedCapacity indicates rejection because queue capacity limits were met.
// The associated error will wrap `ErrQueueAtCapacity` (and `ErrRejected`).
QueueOutcomeRejectedCapacity

// QueueOutcomeRejectedOther indicates rejection for reasons other than capacity before the request was formally
// enqueued.
// The specific underlying cause can be determined from the associated error (e.g., a nil request, an unregistered
// flow ID, or a controller shutdown), which will be wrapped by `ErrRejected`.
// flow ID, or controller shutdown), which will be wrapped by `ErrRejected`.
QueueOutcomeRejectedOther

// --- Post-Enqueue Eviction Outcomes (request was in a `framework.SafeQueue` but not dispatched) ---
Expand All @@ -57,21 +60,18 @@ const (
// `context.DeadlineExceeded` error) (and `ErrEvicted`).
QueueOutcomeEvictedContextCancelled

// QueueOutcomeEvictedDisplaced indicates eviction from a queue to make space for another request due to a
// displacement policy.
// The associated error will wrap `ErrDisplaced` (and `ErrEvicted`).
QueueOutcomeEvictedDisplaced

// QueueOutcomeEvictedOther indicates eviction from a queue for reasons not covered by more specific eviction
// outcomes.
// The specific underlying cause can be determined from the associated error (e.g., a controller shutdown while the
// item was queued), which will be wrapped by `ErrEvicted`.
// The specific underlying cause can be determined from the associated error (e.g., controller shutdown while the item
// was queued), which will be wrapped by `ErrEvicted`.
QueueOutcomeEvictedOther
)

// String returns a human-readable string representation of the QueueOutcome.
func (o QueueOutcome) String() string {
switch o {
case QueueOutcomeNotYetFinalized:
return "NotYetFinalized"
case QueueOutcomeDispatched:
return "Dispatched"
case QueueOutcomeRejectedCapacity:
Expand All @@ -82,8 +82,6 @@ func (o QueueOutcome) String() string {
return "EvictedTTL"
case QueueOutcomeEvictedContextCancelled:
return "EvictedContextCancelled"
case QueueOutcomeEvictedDisplaced:
return "EvictedDisplaced"
case QueueOutcomeEvictedOther:
return "EvictedOther"
default:
Expand Down
19 changes: 10 additions & 9 deletions pkg/epp/flowcontrol/types/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"time"
)

// FlowControlRequest is the contract for an incoming request submitted to the Flow Controller. It represents the "raw"
// user-provided data and context for a single unit of work.
// FlowControlRequest is the contract for an incoming request submitted to the `controller.FlowController`. It
// represents the "raw" user-provided data and context for a single unit of work.
//
// An object implementing this interface is the primary input to `FlowController.EnqueueAndWait()`. The controller then
// wraps this object with its own internal structures (which implement `QueueItemAccessor`) to manage the request's
Expand All @@ -34,12 +34,12 @@ type FlowControlRequest interface {
Context() context.Context

// FlowID returns the unique identifier for the flow this request belongs to (e.g., model name, tenant ID). The
// `controller.FlowController` uses this ID, in conjunction with the flow's registered priority, to look up the
// active `contracts.ManagedQueue` from the `contracts.FlowRegistry`'s `contracts.RegistryShard`.
// `controller.FlowController` uses this ID to look up the active `contracts.ManagedQueue` and configured
// `framework.IntraFlowDispatchPolicy` from a `contracts.RegistryShard`.
FlowID() string

// ByteSize returns the request's size in bytes (e.g., prompt size). This is used by the `controller.FlowController`
// and for managing byte-based capacity limits and for `contracts.FlowRegistry` statistics.
// for managing byte-based capacity limits and for `contracts.FlowRegistry` statistics.
ByteSize() uint64

// InitialEffectiveTTL returns the suggested Time-To-Live for this request.
Expand Down Expand Up @@ -78,11 +78,12 @@ type QueueItemHandle interface {
IsInvalidated() bool
}

// QueueItemAccessor provides the internal, enriched, read-only view of a request being managed within the Flow
// Controller's queues. It is the primary interface through which `framework.SafeQueue` implementations and policy
// plugins interact with request data and its associated flow control metadata.
// QueueItemAccessor provides the internal, enriched, read-only view of a request being managed within the
// controller.FlowController`'s queues. It is the primary interface through which `framework.SafeQueue` implementations
// and policy plugins interact with request data and its associated flow control metadata.
//
// The Flow Controller creates an object that implements this interface by wrapping an incoming `FlowControlRequest`.
// The `controller.FlowController` creates an object that implements this interface by wrapping an incoming
// `FlowControlRequest`.
type QueueItemAccessor interface {
// OriginalRequest returns the underlying `FlowControlRequest` that this accessor provides a view of.
// This method serves as an escape hatch, allowing policies or components that are aware of specific
Expand Down