Skip to content

Commit 0645012

Browse files
committed
Type-sytem for flow control.
1 parent aa4989e commit 0645012

24 files changed

+2280
-0
lines changed

pkg/epp/flowcontrol/README.md

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
# Flow Control Module
2+
3+
## Introduction
4+
5+
In a multi-tenant, heterogeneous inference serving environment, managing diverse SLOs and fairness requirements is
6+
critical. Today, the serving stack often relies on a simple "best-effort" or FIFO (First-In, First-Out) basis for
7+
handling requests. This is insufficient and leads to significant problems:
8+
9+
* **Head-of-Line Blocking**: A long-running, low-priority request can block short, high-priority requests, violating
10+
SLOs.
11+
* **Lack of Predictability**: Without proper queuing and prioritization, it's impossible to provide predictable latency
12+
guarantees to different tenants.
13+
* **Inability to Handle Saturation**: Under heavy load, the system has no graceful way to manage overload, leading to
14+
cascading failures instead of controlled degradation.
15+
16+
The Flow Controller is a sophisticated library designed to solve these problems. It acts as a crucial gatekeeper that
17+
decides *if* and *when* a request should proceed to be scheduled. Its primary mission is to enable predictable, fair,
18+
and efficient utilization of shared backend resources by enforcing prioritization, applying fairness policies, managing
19+
request queuing under saturation, and orchestrating displacement (the eviction of lower-priority queued items to make
20+
space for higher-priority ones).
21+
22+
It is designed for extensibility, allowing custom logic for policies and queuing mechanisms to be plugged into a robust,
23+
high-performance orchestration engine.
24+
25+
### Role in the Gateway API Inference Extension
26+
27+
Within the Gateway API Inference Extension's Endpoint Picker (EPP), the Flow Controller acts as a crucial gatekeeper
28+
between the Routing and Scheduling layers. It decides *if* and *when* a request, already assigned to a logical flow
29+
(e.g., a specific workload or tenant), should proceed to be scheduled onto a backend resource. It is the primary
30+
mechanism for managing diverse SLOs, ensuring fairness among competing workloads, and maintaining system stability under
31+
high load.
32+
33+
### High Level Architecture
34+
35+
The following diagram illustrates the high-level dependency model and request flow for the system. It shows how
36+
concurrent client requests are managed by the central `FlowController`, which in turn relies on a set of decoupled
37+
components to make its decisions. Each component package in this module will contain its own more detailed architectural
38+
diagrams.
39+
40+
```mermaid
41+
graph LR
42+
%% Style Definitions
43+
classDef default fill:#fff,stroke:#333,stroke-width:1.5px,color:#000;
44+
classDef client fill:#dcfce7,stroke:#333;
45+
classDef system_entry fill:#fef9c3,stroke:#333;
46+
classDef downstream_ok fill:#dbeafe,stroke:#333;
47+
classDef downstream_err fill:#fee2e2,stroke:#333;
48+
49+
%% Client Goroutines (Fan-In)
50+
subgraph Client Goroutines
51+
direction TB
52+
R1(Goroutine 1);
53+
R2(Goroutine N);
54+
end
55+
56+
%% Flow Control System
57+
subgraph Flow Control System
58+
C{Flow Controller Engine};
59+
60+
subgraph Internal Interactions
61+
direction LR
62+
D(Ports) -- "abstracts state" --> E(Flow Registry);
63+
D -- "abstracts load" --> SD(Saturation Detector);
64+
E -- "configures" --> F(Framework);
65+
F -- "defines" --> P(Plugins: Queues & Policies);
66+
end
67+
68+
C -- "Orchestrates via<br>abstractions" --> D;
69+
end
70+
71+
%% Downstream Actions (Fan-Out)
72+
subgraph Downstream Actions
73+
direction TB
74+
A1(Outcome: Dispatched<br>Proceed to Scheduler);
75+
A2(Outcome: Rejected<br>Return Error);
76+
end
77+
78+
%% Connections
79+
R1 -- "calls & blocks" --> C;
80+
R2 -- "calls & blocks" --> C;
81+
C -- "unblocks 'goroutine 1'" --> A1;
82+
C -- "unblocks 'goroutine N'" --> A2;
83+
84+
%% Apply Classes
85+
class R1,R2 client;
86+
class C system_entry;
87+
class A1 downstream_ok;
88+
class A2 downstream_err;
89+
class D,E,F,P,SD default;
90+
```
91+
92+
## Architectural Pillars
93+
94+
The Flow Controller framework is built on several key components that work in concert. This architecture is designed to
95+
be highly modular and scalable, with clear separation of concerns. For a deep dive into the specific design choices and
96+
their justifications, please refer to the detailed documentation within the relevant sub-packages.
97+
98+
1. **The `FlowController` Engine (`./controller`)**: The central, sharded orchestrator responsible for the main request
99+
processing loop. It manages a pool of workers that distribute incoming requests, apply policies, and dispatch
100+
requests to the backends. Its design focuses on high throughput and backpressure.
101+
102+
2. **Pluggable `Policy` Framework (`./framework`)**: This defines the core interfaces for all pluggable logic. It
103+
features a two-tier policy system for `InterFlow` (decisions *between* different flows) and `IntraFlow`
104+
(decisions *within* a single flow) logic, covering both request dispatch and displacement.
105+
106+
3. **Extensible `SafeQueue` System (`./framework`)**: This defines the `framework.SafeQueue` interface for
107+
concurrent-safe request storage. It uses a `QueueCapability` system that allows for diverse and extensible queue
108+
implementations (e.g., FIFO, Priority Heap) while maintaining a stable interface.
109+
110+
4. **The `FlowRegistry` (`./registry`, `./ports`)**: This is the stateful control plane of the system. It manages the
111+
configuration and lifecycle of all flows, policies, and queues. It presents a sharded view of its state to the
112+
`FlowController` workers to enable parallel operation with minimal lock contention.
113+
114+
5. **Core Types and Service Ports (`./types`, `./ports`)**: These packages define the foundational data structures
115+
(e.g., `FlowControlRequest`), errors, and service interfaces that decouple the engine from its dependencies,
116+
following a "Ports and Adapters" architectural style.
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
# FlowController Engine: Architecture and Design Rationale
2+
3+
This document provides an in-depth explanation of the `FlowController` engine, its core operational mechanisms, and the
4+
architectural principles that guide its design for high-throughput, scalable request management.
5+
6+
## Overview
7+
8+
The `FlowController` is the central processing engine of the flow control system. It is a sharded, high-throughput
9+
component responsible for managing the lifecycle of all incoming requests—from initial submission via the synchronous
10+
`EnqueueAndWait` method to a terminal outcome (dispatch, rejection, or eviction). It achieves this by orchestrating the
11+
`FlowRegistry`, pluggable `Policy` framework, and `SaturationDetector` to make continuous, state-aware decisions.
12+
13+
## Key Operational Mechanisms
14+
15+
* **Request Ingress (`EnqueueAndWait`)**: The primary entry point for all requests. This method is synchronous and
16+
blocks the calling goroutine until the Flow Controller determines a final outcome for the request. The rationale for
17+
this model is detailed below.
18+
19+
* **Request Distribution**: A top-level distributor receives requests and uses a
20+
**Join-the-Shortest-Queue-by-Bytes (JSQ-Bytes)** algorithm to assign each request to a specific worker shard,
21+
balancing the load across the system.
22+
23+
* **The Worker Processing Loop (`run`)**: Each `shardProcessor` instance executes the main processing loop, which is the
24+
hot path for request orchestration on its shard. The loop constantly interleaves accepting new requests with
25+
attempting to dispatch already-queued requests. This design is a deliberate strategy for **contention management**.
26+
With potentially many (`M`) concurrent goroutines calling `EnqueueAndWait` and `N` worker goroutines dispatching, this
27+
interleaving ensures that the channel from the distributor to the worker does not become a deep, unmanaged buffer or
28+
starve dispatch attempts providing backpressure and responsiveness.
29+
30+
* **Dispatch and Saturation Gating**: The dispatch process iterates through configured priority bands, from highest to
31+
lowest. Before attempting to dispatch from any queue, the worker consults the `SaturationDetector.IsSaturated()`
32+
method. If the system is saturated, dispatch is throttled to prevent backend overload. If unsaturated, the worker
33+
invokes the band's configured `InterFlowDispatchPolicy` and `IntraFlowDispatchPolicy` to select and dispatch an item.
34+
35+
* **Error Handling Philosophy**: The engine employs a robust, two-tiered error handling strategy to isolate failures and
36+
maximize system availability:
37+
* **"Fail Open" for Priority Bands (Inter-Flow):** If an inter-flow policy fails, the worker logs the error,
38+
**skips that priority band** for the current cycle, and continues to the next, promoting work conservation.
39+
* **"Fail Close" for a Band (Intra-Flow):** If an intra-flow operation fails after a queue has been selected, the
40+
worker **ceases further processing within that entire priority band** for the cycle to prevent a stateless
41+
inter-flow policy from repeatedly selecting a known-problematic queue.
42+
43+
---
44+
45+
## Architectural Deep Dive 1: The `EnqueueAndWait` Model
46+
47+
A fundamental design choice is the synchronous, blocking `EnqueueAndWait` method. In the context of the Gateway API
48+
Inference Extension's Endpoint Picker (EPP), which operates as an Envoy External Processing (`ext_proc`) server, this
49+
model is deliberately chosen for its simplicity and robustness.
50+
51+
* **Alignment with the `ext_proc` Request Lifecycle**: The `ext_proc` protocol is stream-based. A single goroutine
52+
within the EPP manages the stream for a given HTTP request and is ultimately responsible for telling Envoy how to
53+
proceed. `EnqueueAndWait` fits this perfectly: the request-handling goroutine calls it, blocks, and upon return, has
54+
the definitive outcome. It can then immediately act on that outcome (e.g., proceed to the scheduler or return an error
55+
to Envoy), maintaining clear request-goroutine affinity.
56+
57+
* **Simplified State Management**: The state of a "waiting" request is implicitly managed by the blocked goroutine's
58+
stack and its `context.Context`. The Flow Controller only needs to signal this specific goroutine to unblock it. An
59+
alternative, non-blocking handoff model would require complex intermediate data structures, explicit state machines,
60+
and correlation logic to route a decision back to the original request context.
61+
62+
* **Direct Backpressure Propagation**: If queues are full and displacement fails, `EnqueueAndWait` returns an
63+
`ErrQueueAtCapacity`. This provides immediate, direct backpressure to the earliest point of contact, preventing the
64+
system from accepting work it cannot handle.
65+
66+
* **Clearer Error Handling**: When `EnqueueAndWait` returns an error, the original goroutine in charge of the `ext_proc`
67+
stream can immediately formulate the correct HTTP response. A staged, asynchronous model would require a more complex
68+
mechanism to communicate a failure from a later stage back to the goroutine managing the Envoy stream.
69+
70+
---
71+
72+
## Architectural Deep Dive 2: The Sharded Model
73+
74+
The design of the Flow Controller is built on a sharded architecture to enable parallel processing and prevent the
75+
central dispatch loop from becoming a bottleneck at high request rates. This choice has profound implications for state
76+
management, fairness, and request distribution.
77+
78+
### The Sharded Architecture and its Implications
79+
80+
The `FlowController` consists of a top-level manager and a pool of independent `shardProcessor` workers. The
81+
`FlowRegistry` guarantees that every logical flow is represented by a distinct queue instance on every active shard.
82+
This creates `N` parallel instances for each flow, managed by `N` independent workers.
83+
84+
This architecture trades deterministic global state for high throughput and scalability. The key challenge, and the
85+
system's most critical assumption, revolves around ensuring this distributed model can still achieve global fairness
86+
objectives.
87+
88+
#### The Critical Assumption: Workload Homogeneity Within Flows
89+
90+
The effectiveness of the sharded model hinges on a critical assumption: **while the system as a whole manages a**
91+
**heterogeneous set of flows, the traffic *within a single logical flow* is assumed to be roughly homogeneous in its**
92+
**characteristics over a sliding time window.** A logical flow is intended to represent a single workload or tenant;
93+
therefore, the most unpredictable variables (like decode behavior based on non-structural request characteristics) are
94+
expected to be statistically similar *within* that flow.
95+
96+
### Request Distribution: Join the Shortest Queue by Bytes (JSQ-Bytes)
97+
98+
To make the critical assumption as robust as possible, the `FlowController` uses a
99+
**Join the Shortest Queue by Bytes (JSQ-Bytes)** algorithm to distribute incoming requests.
100+
101+
* **What JSQ-Bytes Measures**: `ByteSize` is an excellent proxy for the resources the Flow Controller explicitly
102+
manages: host memory pressure and queuing capacity. It is also a reasonable proxy for prefill compute time and
103+
Head-of-Line (HOL) blocking within the controller itself.
104+
* **Why It's a Better Fit**: The goal of the distributor is not to perfectly predict backend compute time, but to
105+
intelligently balance the load at the controller level. JSQ-Bytes achieves this by:
106+
1. **Reflecting True Load**: It distributes work based on each shard's current queue size in bytes—a direct measure
107+
of its memory and capacity congestion.
108+
2. **Adapting to Real-Time Congestion**: The byte-size of a queue is a real-time signal of a shard's overall
109+
congestion. JSQ-Bytes adaptively steers new work away from momentarily struggling workers.
110+
3. **Hedging Against Assumption Violations**: This adaptive, self-correcting nature makes it a powerful hedge. It
111+
doesn't just distribute; it actively *load balances* based on the most relevant feedback available.
112+
113+
### Stateful Policies in a Sharded Registry
114+
115+
Sharding fundamentally changes how stateful policies achieve global objectives.
116+
117+
* **Shard-Local State**: Policies like a simple Round Robin can operate with purely shard-local state. When the critical
118+
assumption holds, the independent actions of these `N` policies result in **emergent, approximate global fairness**.
119+
* **Global State Dependencies**: Achieving true, deterministic global fairness is still possible. However, this requires
120+
policies to be designed with a dependency on an external, globally-consistent state store (e.g., a central metrics
121+
service for policies that track SLO attainment).
122+
* **The Low-QPS Challenge**: The critical assumption is most stressed by low-QPS flows. Managing the `shard count` is a
123+
key operational lever to mitigate this.
124+
125+
### The Displacement Strategy: Iterative and Shard-Local
126+
127+
Displacement is a corrective, non-hot-path mechanism activated only when a request cannot be enqueued due to capacity
128+
limits. The strategy is designed for robustness and simplicity over theoretical perfection.
129+
130+
#### Why Displacement is Shard-Local
131+
132+
A key principle is that the entire displacement process is **confined to the shard where the request landed**. A global
133+
mechanism would require cross-shard locking, re-introducing massive contention and **destroying the scalability**
134+
**benefits of the sharded architecture**. We consciously trade optimal packing efficiency for superior performance.
135+
136+
#### Why Displacement is Iterative, Not Batched
137+
138+
The engine displaces victims one by one, re-evaluating after each removal.
139+
140+
* **The Challenge: State Mutation**: Policies are dynamic, and displacement mutates state. Removing a single victim
141+
changes the `ByteSize` and `Len` of a queue. Furthermore, both policies and the underlying `SafeQueue` implementation
142+
have internal state that changes upon removal (e.g., a min-heap must re-heapify, changing the next head item).
143+
* **Why Simulation is Intractable**: Because every removal changes the state, pre-calculating a batch of victims would
144+
require a slow, complex, serial simulation, which is antithetical to the system's performance goals.
145+
* **The Pragmatic Solution**: We use a fast, initial feasibility check:
146+
`(total_bytes_in_lower_bands) * (tolerance_factor) > bytes_needed`. The **tolerance factor** provides a crucial
147+
buffer, acknowledging that policies may legitimately choose not to select a victim. This heuristic aims to prevent a
148+
costly and futile displacement cascade where a large request churns through many smaller items only to be rejected
149+
anyway.
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package controller
18+
19+
import "time"
20+
21+
// FlowControllerConfig holds the top-level configuration for a FlowController instance.
22+
23+
// FlowControllerConfig holds the top-level configuration for a FlowController instance.
24+
type FlowControllerConfig struct {
25+
// DefaultQueueTTL is the default Time-To-Live applied to requests within queues if the incoming request does not
26+
// specify its own via InitialEffectiveTTL().
27+
//
28+
// Optional: If not set, a reasonable system default (e.g., 30 seconds) will be used.
29+
DefaultQueueTTL time.Duration
30+
31+
// ExpiryCleanupInterval is the frequency at which each FlowController worker's background routine checks for and
32+
// removes expired items from all managed queues in its shard.
33+
//
34+
// Optional: If not set or set to zero, a reasonable system default (e.g., 1 second) will be used.
35+
ExpiryCleanupInterval time.Duration
36+
}

0 commit comments

Comments
 (0)