Skip to content

Conversation

LukeAVanDrie
Copy link
Contributor

@LukeAVanDrie LukeAVanDrie commented Jul 10, 2025

This pull request introduces a pluggable queueing framework to the EPP Flow Control layer. The goal of this change is to establish a robust and extensible system for managing in-flight request storage, which is a prerequisite for implementing advanced scheduling, prioritization, and fairness policies.

This work tracks #674

This new framework is built on a set of core interfaces and includes two initial queue implementations to demonstrate its flexibility.

Key Changes

  • New framework.SafeQueue Interface: Defines a clear contract for concurrent-safe queue implementations. This ensures that any queue plugin can be used interchangeably by the core controller.
  • Pluggable Queue Implementations:
    • listqueue: A simple and efficient FIFO (First-In, First-Out) queue, ideal for standard workloads.
    • maxminheap: A priority queue that provides O(1) access to both the highest and lowest priority items, enabling more sophisticated, policy-driven ordering.
  • Plugin Discovery and Registration: A factory pattern allows new queue implementations to be added to the system just by registering them in their init() function.
  • Rigorous Testing:
    • A conformance test suite has been added to automatically validate that any new queue implementation correctly adheres to the SafeQueue contract.
    • A centralized benchmark suite provides a consistent way to measure and compare the performance of different queue implementations under various workload scenarios.
  • Refined Core Types: The data model in the types package has been slightly updated to support this framework better.

Why This Matters

This framework is a critical building block for the Flow Control system. By decoupling the core logic from the queuing discipline, we can now:

  • Implement and experiment with different fairness and prioritization policies without changing the core controller.
  • Select the most performant queue implementation for a given flow and its configured policies.
  • Easily extend the system with new queuing strategies in the future.

This PR lays the groundwork for the upcoming work on the Flow Control policies and the controller itself.

@k8s-ci-robot k8s-ci-robot added the cncf-cla: yes Indicates the PR's author has signed the CNCF CLA. label Jul 10, 2025
@k8s-ci-robot k8s-ci-robot requested review from kfswain and liu-cong July 10, 2025 21:50
@k8s-ci-robot k8s-ci-robot added the needs-ok-to-test Indicates a PR that requires an org member to verify it is safe to test. label Jul 10, 2025
@k8s-ci-robot
Copy link
Contributor

Hi @LukeAVanDrie. Thanks for your PR.

I'm waiting for a kubernetes-sigs member to verify that this patch is reasonable to test. If it is, they should reply with /ok-to-test on its own line. Until that is done, I will not automatically test new commits in this PR, but the usual testing commands by org members will still work. Regular contributors should join the org to skip this step.

Once the patch is verified, the new status will be reflected by the ok-to-test label.

I understand the commands that are listed here.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

Copy link

netlify bot commented Jul 10, 2025

Deploy Preview for gateway-api-inference-extension ready!

Name Link
🔨 Latest commit 5060950
🔍 Latest deploy log https://app.netlify.com/projects/gateway-api-inference-extension/deploys/687039ad624fce0008316f3d
😎 Deploy Preview https://deploy-preview-1138--gateway-api-inference-extension.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify project configuration.

@k8s-ci-robot k8s-ci-robot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Jul 10, 2025
// from `FlowControlRequest.FlowID()`.
ID() string
// It acts as the registration key for a flow within the `ports.FlowRegistry`.
type FlowSpecification struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kfswain This really does not need to be an interface. I changed this to be a struct and tidied up its documentation.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack this is a representation of a grouping of requests? (tenant/identity/flow)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is the registration key for a logical flow (tenant/identity) with the Flow Control System. It will hold the FlowID, it's priority level (e.g., Critical), and later a FlowConfig allowing configuring policies, queue capacity, etc. at a flow level (this will be propagated from whatever CRDs we align on for flow policy).


// RequestID is the user-facing ID from the original request (`FlowControlRequest.ID()`).
RequestID() string

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kfswain These can all be obtained from the OriginalRequest(). I'm removing an unnecessary layer of indirection for simplicity and to scope down the interface.

// ItemComparator encapsulates the logic for comparing two `types.QueueItemAccessor` instances to determine their
// relative dispatch priority. It is the definitive source of ordering truth for a flow's queue.
//
// It is vended by an `IntraFlowDispatchPolicy` and used by `SafeQueue` implementations that support the
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kfswain IntraFlowDispatchPolicy will be introduced in the next PR. It vends the ItemComparator used to configure the SafeQueue, so I had to introduce the type alongside the queues before the full context is present in the repo.

// level (e.g., 1, 3, ...), its value is smaller than all values in its subtree (min level). This structure allows for
// efficient O(1) retrieval of both the maximum and minimum priority items.
//
// The core heap maintenance logic (up, down, and grandchild finding) is adapted from the public domain implementation
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kfswain Calling attention to this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. The documentation mentions this is currently not used in the primary implementation, correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Queues are not actually an operator-configurable extension point. The Flow Registry will look at the union of all policy requirements (RequiredQueueCapabilities declared by the intra and inter-flow policies) to choose the appropriate queue for a flow.

By default, we are using "FCFS" as our intra-flow dispatch policy, so it would default to "ListQueue". If the operator configured a different dispatch strategy (e.g., Earliest Deadline First), we would default to a queue supporting priority configuration--in this case, the "MaxMinHeap".

Copy link
Contributor Author

@LukeAVanDrie LukeAVanDrie Jul 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is currently not used in the primary implementation

In short, this won't be used with the default set of policy plugins but is required for any advanced intra-flow dispatch policies.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. I would argue that this would belong in a V2, I dont think we are tackling intra-flow dispatch in this iteration, but in this case, I won't have you throw away work

This commit introduces a new pluggable framework for request queuing within the EPP Flow Control layer. This change establishes the core interfaces and initial implementations needed for sophisticated request management, prioritization, and fairness.

The key components of this framework are:

- **`framework.SafeQueue` Interface**: A new contract for concurrent-safe queue implementations. It defines a standard set of behaviors for adding, removing, peeking, and managing items, ensuring that all queue plugins are interchangeable.

- **Queue Plugin Implementations**:
  - **`listqueue`**: A simple, efficient FIFO queue based on `container/list`. Ideal for basic, fair queuing workloads.
  - **`maxminheap`**: A priority queue based on a max-min heap, allowing for O(1) access to both the highest and lowest priority items. This is suitable for advanced policies that require configurable ordering.

- **Plugin Registration**: A factory pattern (`queue.MustRegisterQueue`) allows new queue implementations to be discovered and registered at runtime, making the system extensible.

- **Comprehensive Testing**:
  - A new conformance test suite (`TestQueueConformance`) ensures that all registered queue plugins strictly adhere to the `SafeQueue` contract, covering lifecycle, ordering, edge cases, and concurrency.
  - A centralized benchmark suite (`BenchmarkQueues`) provides a fair, apples-to-apples performance comparison of all queue implementations across various workload patterns.

- **Core Type Refinements**: The `types` package has been updated to support this new framework, including a refined `QueueItemAccessor` interface and a new `QueueItemHandle` for opaque, safe item manipulation.

This framework decouples the core flow control logic from the specific queuing disciplines, enabling future work on advanced dispatch and displacement policies.
@LukeAVanDrie LukeAVanDrie force-pushed the feat/flow-control-framework-plugins-queue branch from 61c5f00 to 5060950 Compare July 10, 2025 22:07
@LukeAVanDrie
Copy link
Contributor Author

/assign @ahg-g

Copy link
Collaborator

@kfswain kfswain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving some questions for now! Seems reasonable at a first pass

constructor function that matches the `queue.QueueConstructor` signature.

3. **Add to the Conformance Test**
- Add a blank import for your new package to [`conformance_test.go`](./conformance_test.go). Your queue will then be
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the goal of the conformance test here? This might be confusing as its name overlaps with our larger conformance testing suite, which is intended for Gateway API adopters

Copy link
Contributor Author

@LukeAVanDrie LukeAVanDrie Jul 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For all the extension points, framework.SafeQueue (for framework/plugins/queue/...) framework.IntraFlowDispatchPolicy (for framework/plugins/policies/intraflow/dispatch/...) and so on, I have a black box test suite that validates the interface conformance requirements.

If you poke through the test file, the intent becomes clear. Any new queue plugin implementation gets automatic test coverage for the framework.SafeQueue criteria the Flow Control system relies upon. Individual tests only need to be written for implementation-specific internal details. You can see that I did not include a test for listqueue.go for this reason, but I did add some additional coverage for maxminheap.go to validate the heap structure.

This is not API conformance, so I'm open to naming suggestions if you think the term is overloaded.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are your thoughts on: functional tests?

}

// Handle returns the underlying queue-specific raw handle.
func (lh *listItemHandle) Handle() any {
Copy link
Collaborator

@kfswain kfswain Jul 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently the queue should be filled with requests right? I think weakly typing contents is useful if we know it has to be very general. But this case the type should be pretty tightly scoped, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't weakly typing the item (all queues store types.QueueItemAccessor); it's weakly typing the reference to the item's location in the queue.

When we call SafeQueue.Remove(handle), each queue implementation needs an efficient way to lookup the item for removal. We don't want to scan the entire queue.

For "ListQueue" I am returning a pointer to the list element here, granting us O(1) removal of items from the queue.

For "MaxMinHeap" its:

// heapItem is an internal struct to hold an item and its index in the heap.
type heapItem struct {
	item          types.QueueItemAccessor
	index         int
	isInvalidated bool
}

granting us O(log n) removal of items from the queue.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it got it, and handle is used in the general framework somewhere? I see it heavily used in each implementation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the flow control engine typically applies this workflow:

  1. Flow Controller applies InterFlow and IntraFlow dispatch policies to get the next request for dispatch (during this, the IntraFlow policy calls queue.Peek)
  2. Perform some checks (are backends saturated, is the request already expired, etc.)... during this time other requests may have been enqueued, expired and automatically cleaned up, or other concurrent race conditions), meaning the selected request may no longer be at the head of its queue
  3. Flow Controller calls queue.Remove(selectedReq.Handle()) to remove the item from the queue.
  4. Flow Controller unblocks the request routine, thereby dispatching it.

The queue interaction patterns for a dispatched request are Add() -- which sets the handle, Peek(), Remove(req.Handle()).

// level (e.g., 1, 3, ...), its value is smaller than all values in its subtree (min level). This structure allows for
// efficient O(1) retrieval of both the maximum and minimum priority items.
//
// The core heap maintenance logic (up, down, and grandchild finding) is adapted from the public domain implementation
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. The documentation mentions this is currently not used in the primary implementation, correct?

// from `FlowControlRequest.FlowID()`.
ID() string
// It acts as the registration key for a flow within the `ports.FlowRegistry`.
type FlowSpecification struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack this is a representation of a grouping of requests? (tenant/identity/flow)

EnqueueTime() time.Time

// EffectiveTTL is the actual Time-To-Live assigned to this item by the `controller.FlowController`, taking into
// account the request's preference (`FlowControlRequest.InitialEffectiveTTL()`) and any `controller.FlowController`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InitialEffectiveTTL is based on the time the request was sent vs entry into flow control(EnqueueTime)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan on configuring a default TLL for the Flow Control system. I am allowing requests, however, to specify their own TTL preference with FlowControlRequest.InitialEffectiveTTL(). In my current POC, I use the request specific TTL as EffectiveTTL falling back to the default if unspecified.

@kfswain
Copy link
Collaborator

kfswain commented Jul 11, 2025

/ok-to-test

@k8s-ci-robot k8s-ci-robot added ok-to-test Indicates a non-member PR verified by an org member that is safe to test. and removed needs-ok-to-test Indicates a PR that requires an org member to verify it is safe to test. labels Jul 11, 2025
@ahg-g
Copy link
Contributor

ahg-g commented Jul 15, 2025

/lgtm

I am fine with this to unblock the rest of the PRs; leaving the approve to @kfswain

@k8s-ci-robot k8s-ci-robot added the lgtm "Looks good to me", indicates that a PR is ready to be merged. label Jul 15, 2025
@kfswain
Copy link
Collaborator

kfswain commented Jul 15, 2025

/approve

Looks good, thanks Luke!

@k8s-ci-robot
Copy link
Contributor

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: kfswain, LukeAVanDrie

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@k8s-ci-robot k8s-ci-robot added the approved Indicates a PR has been approved by an approver from all required OWNERS files. label Jul 15, 2025
@k8s-ci-robot k8s-ci-robot merged commit f486883 into kubernetes-sigs:main Jul 15, 2025
9 checks passed
kfswain pushed a commit to kfswain/llm-instance-gateway that referenced this pull request Jul 31, 2025
This commit introduces a new pluggable framework for request queuing within the EPP Flow Control layer. This change establishes the core interfaces and initial implementations needed for sophisticated request management, prioritization, and fairness.

The key components of this framework are:

- **`framework.SafeQueue` Interface**: A new contract for concurrent-safe queue implementations. It defines a standard set of behaviors for adding, removing, peeking, and managing items, ensuring that all queue plugins are interchangeable.

- **Queue Plugin Implementations**:
  - **`listqueue`**: A simple, efficient FIFO queue based on `container/list`. Ideal for basic, fair queuing workloads.
  - **`maxminheap`**: A priority queue based on a max-min heap, allowing for O(1) access to both the highest and lowest priority items. This is suitable for advanced policies that require configurable ordering.

- **Plugin Registration**: A factory pattern (`queue.MustRegisterQueue`) allows new queue implementations to be discovered and registered at runtime, making the system extensible.

- **Comprehensive Testing**:
  - A new conformance test suite (`TestQueueConformance`) ensures that all registered queue plugins strictly adhere to the `SafeQueue` contract, covering lifecycle, ordering, edge cases, and concurrency.
  - A centralized benchmark suite (`BenchmarkQueues`) provides a fair, apples-to-apples performance comparison of all queue implementations across various workload patterns.

- **Core Type Refinements**: The `types` package has been updated to support this new framework, including a refined `QueueItemAccessor` interface and a new `QueueItemHandle` for opaque, safe item manipulation.

This framework decouples the core flow control logic from the specific queuing disciplines, enabling future work on advanced dispatch and displacement policies.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved Indicates a PR has been approved by an approver from all required OWNERS files. cncf-cla: yes Indicates the PR's author has signed the CNCF CLA. lgtm "Looks good to me", indicates that a PR is ready to be merged. ok-to-test Indicates a non-member PR verified by an org member that is safe to test. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants