-
Notifications
You must be signed in to change notification settings - Fork 179
feat: Introduce pluggable queue framework #1138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Introduce pluggable queue framework #1138
Conversation
Hi @LukeAVanDrie. Thanks for your PR. I'm waiting for a kubernetes-sigs member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
✅ Deploy Preview for gateway-api-inference-extension ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
// from `FlowControlRequest.FlowID()`. | ||
ID() string | ||
// It acts as the registration key for a flow within the `ports.FlowRegistry`. | ||
type FlowSpecification struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kfswain This really does not need to be an interface. I changed this to be a struct and tidied up its documentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack this is a representation of a grouping of requests? (tenant/identity/flow)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this is the registration key for a logical flow (tenant/identity) with the Flow Control System. It will hold the FlowID, it's priority level (e.g., Critical), and later a FlowConfig
allowing configuring policies, queue capacity, etc. at a flow level (this will be propagated from whatever CRDs we align on for flow policy).
|
||
// RequestID is the user-facing ID from the original request (`FlowControlRequest.ID()`). | ||
RequestID() string | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kfswain These can all be obtained from the OriginalRequest()
. I'm removing an unnecessary layer of indirection for simplicity and to scope down the interface.
// ItemComparator encapsulates the logic for comparing two `types.QueueItemAccessor` instances to determine their | ||
// relative dispatch priority. It is the definitive source of ordering truth for a flow's queue. | ||
// | ||
// It is vended by an `IntraFlowDispatchPolicy` and used by `SafeQueue` implementations that support the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kfswain IntraFlowDispatchPolicy will be introduced in the next PR. It vends the ItemComparator
used to configure the SafeQueue
, so I had to introduce the type alongside the queues before the full context is present in the repo.
// level (e.g., 1, 3, ...), its value is smaller than all values in its subtree (min level). This structure allows for | ||
// efficient O(1) retrieval of both the maximum and minimum priority items. | ||
// | ||
// The core heap maintenance logic (up, down, and grandchild finding) is adapted from the public domain implementation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kfswain Calling attention to this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack. The documentation mentions this is currently not used in the primary implementation, correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Queues are not actually an operator-configurable extension point. The Flow Registry will look at the union of all policy requirements (RequiredQueueCapabilities
declared by the intra and inter-flow policies) to choose the appropriate queue for a flow.
By default, we are using "FCFS" as our intra-flow dispatch policy, so it would default to "ListQueue". If the operator configured a different dispatch strategy (e.g., Earliest Deadline First), we would default to a queue supporting priority configuration--in this case, the "MaxMinHeap".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is currently not used in the primary implementation
In short, this won't be used with the default set of policy plugins but is required for any advanced intra-flow dispatch policies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack. I would argue that this would belong in a V2, I dont think we are tackling intra-flow dispatch in this iteration, but in this case, I won't have you throw away work
This commit introduces a new pluggable framework for request queuing within the EPP Flow Control layer. This change establishes the core interfaces and initial implementations needed for sophisticated request management, prioritization, and fairness. The key components of this framework are: - **`framework.SafeQueue` Interface**: A new contract for concurrent-safe queue implementations. It defines a standard set of behaviors for adding, removing, peeking, and managing items, ensuring that all queue plugins are interchangeable. - **Queue Plugin Implementations**: - **`listqueue`**: A simple, efficient FIFO queue based on `container/list`. Ideal for basic, fair queuing workloads. - **`maxminheap`**: A priority queue based on a max-min heap, allowing for O(1) access to both the highest and lowest priority items. This is suitable for advanced policies that require configurable ordering. - **Plugin Registration**: A factory pattern (`queue.MustRegisterQueue`) allows new queue implementations to be discovered and registered at runtime, making the system extensible. - **Comprehensive Testing**: - A new conformance test suite (`TestQueueConformance`) ensures that all registered queue plugins strictly adhere to the `SafeQueue` contract, covering lifecycle, ordering, edge cases, and concurrency. - A centralized benchmark suite (`BenchmarkQueues`) provides a fair, apples-to-apples performance comparison of all queue implementations across various workload patterns. - **Core Type Refinements**: The `types` package has been updated to support this new framework, including a refined `QueueItemAccessor` interface and a new `QueueItemHandle` for opaque, safe item manipulation. This framework decouples the core flow control logic from the specific queuing disciplines, enabling future work on advanced dispatch and displacement policies.
61c5f00
to
5060950
Compare
/assign @ahg-g |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving some questions for now! Seems reasonable at a first pass
constructor function that matches the `queue.QueueConstructor` signature. | ||
|
||
3. **Add to the Conformance Test** | ||
- Add a blank import for your new package to [`conformance_test.go`](./conformance_test.go). Your queue will then be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the goal of the conformance test here? This might be confusing as its name overlaps with our larger conformance testing suite, which is intended for Gateway API adopters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For all the extension points, framework.SafeQueue
(for framework/plugins/queue/...) framework.IntraFlowDispatchPolicy
(for framework/plugins/policies/intraflow/dispatch/...) and so on, I have a black box test suite that validates the interface conformance requirements.
If you poke through the test file, the intent becomes clear. Any new queue plugin implementation gets automatic test coverage for the framework.SafeQueue
criteria the Flow Control system relies upon. Individual tests only need to be written for implementation-specific internal details. You can see that I did not include a test for listqueue.go
for this reason, but I did add some additional coverage for maxminheap.go
to validate the heap structure.
This is not API conformance, so I'm open to naming suggestions if you think the term is overloaded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are your thoughts on: functional tests?
} | ||
|
||
// Handle returns the underlying queue-specific raw handle. | ||
func (lh *listItemHandle) Handle() any { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currently the queue should be filled with requests right? I think weakly typing contents is useful if we know it has to be very general. But this case the type should be pretty tightly scoped, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't weakly typing the item (all queues store types.QueueItemAccessor
); it's weakly typing the reference to the item's location in the queue.
When we call SafeQueue.Remove(handle)
, each queue implementation needs an efficient way to lookup the item for removal. We don't want to scan the entire queue.
For "ListQueue" I am returning a pointer to the list element here, granting us O(1) removal of items from the queue.
For "MaxMinHeap" its:
// heapItem is an internal struct to hold an item and its index in the heap.
type heapItem struct {
item types.QueueItemAccessor
index int
isInvalidated bool
}
granting us O(log n) removal of items from the queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it got it, and handle is used in the general framework somewhere? I see it heavily used in each implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the flow control engine typically applies this workflow:
- Flow Controller applies InterFlow and IntraFlow dispatch policies to get the next request for dispatch (during this, the IntraFlow policy calls queue.Peek)
- Perform some checks (are backends saturated, is the request already expired, etc.)... during this time other requests may have been enqueued, expired and automatically cleaned up, or other concurrent race conditions), meaning the selected request may no longer be at the head of its queue
- Flow Controller calls queue.Remove(selectedReq.Handle()) to remove the item from the queue.
- Flow Controller unblocks the request routine, thereby dispatching it.
The queue interaction patterns for a dispatched request are Add() -- which sets the handle, Peek(), Remove(req.Handle()).
// level (e.g., 1, 3, ...), its value is smaller than all values in its subtree (min level). This structure allows for | ||
// efficient O(1) retrieval of both the maximum and minimum priority items. | ||
// | ||
// The core heap maintenance logic (up, down, and grandchild finding) is adapted from the public domain implementation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack. The documentation mentions this is currently not used in the primary implementation, correct?
// from `FlowControlRequest.FlowID()`. | ||
ID() string | ||
// It acts as the registration key for a flow within the `ports.FlowRegistry`. | ||
type FlowSpecification struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack this is a representation of a grouping of requests? (tenant/identity/flow)
EnqueueTime() time.Time | ||
|
||
// EffectiveTTL is the actual Time-To-Live assigned to this item by the `controller.FlowController`, taking into | ||
// account the request's preference (`FlowControlRequest.InitialEffectiveTTL()`) and any `controller.FlowController` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InitialEffectiveTTL is based on the time the request was sent vs entry into flow control(EnqueueTime)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I plan on configuring a default TLL for the Flow Control system. I am allowing requests, however, to specify their own TTL preference with FlowControlRequest.InitialEffectiveTTL()
. In my current POC, I use the request specific TTL as EffectiveTTL
falling back to the default if unspecified.
/ok-to-test |
/lgtm I am fine with this to unblock the rest of the PRs; leaving the approve to @kfswain |
/approve Looks good, thanks Luke! |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: kfswain, LukeAVanDrie The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
This commit introduces a new pluggable framework for request queuing within the EPP Flow Control layer. This change establishes the core interfaces and initial implementations needed for sophisticated request management, prioritization, and fairness. The key components of this framework are: - **`framework.SafeQueue` Interface**: A new contract for concurrent-safe queue implementations. It defines a standard set of behaviors for adding, removing, peeking, and managing items, ensuring that all queue plugins are interchangeable. - **Queue Plugin Implementations**: - **`listqueue`**: A simple, efficient FIFO queue based on `container/list`. Ideal for basic, fair queuing workloads. - **`maxminheap`**: A priority queue based on a max-min heap, allowing for O(1) access to both the highest and lowest priority items. This is suitable for advanced policies that require configurable ordering. - **Plugin Registration**: A factory pattern (`queue.MustRegisterQueue`) allows new queue implementations to be discovered and registered at runtime, making the system extensible. - **Comprehensive Testing**: - A new conformance test suite (`TestQueueConformance`) ensures that all registered queue plugins strictly adhere to the `SafeQueue` contract, covering lifecycle, ordering, edge cases, and concurrency. - A centralized benchmark suite (`BenchmarkQueues`) provides a fair, apples-to-apples performance comparison of all queue implementations across various workload patterns. - **Core Type Refinements**: The `types` package has been updated to support this new framework, including a refined `QueueItemAccessor` interface and a new `QueueItemHandle` for opaque, safe item manipulation. This framework decouples the core flow control logic from the specific queuing disciplines, enabling future work on advanced dispatch and displacement policies.
This pull request introduces a pluggable queueing framework to the EPP Flow Control layer. The goal of this change is to establish a robust and extensible system for managing in-flight request storage, which is a prerequisite for implementing advanced scheduling, prioritization, and fairness policies.
This work tracks #674
This new framework is built on a set of core interfaces and includes two initial queue implementations to demonstrate its flexibility.
Key Changes
framework.SafeQueue
Interface: Defines a clear contract for concurrent-safe queue implementations. This ensures that any queue plugin can be used interchangeably by the core controller.listqueue
: A simple and efficient FIFO (First-In, First-Out) queue, ideal for standard workloads.maxminheap
: A priority queue that provides O(1) access to both the highest and lowest priority items, enabling more sophisticated, policy-driven ordering.init()
function.SafeQueue
contract.types
package has been slightly updated to support this framework better.Why This Matters
This framework is a critical building block for the Flow Control system. By decoupling the core logic from the queuing discipline, we can now:
This PR lays the groundwork for the upcoming work on the Flow Control policies and the controller itself.