feat: Implement Least Request Load Balancing Policy (gRFC A48)#2651
feat: Implement Least Request Load Balancing Policy (gRFC A48)#2651emil10001 wants to merge 9 commits into
Conversation
dfawley
left a comment
There was a problem hiding this comment.
Thanks for the PR! I haven't looked at the tests yet, but here's a first round of review comments.
| #[default] | ||
| PickFirst, | ||
| RoundRobin, | ||
| LeastRequest, |
There was a problem hiding this comment.
This is a pre-defined list of LB policies limited to just PF and RR. But I think we can do this for now if we put a TODO to remove. cc @nathanielford for when he implements support for the newer field that lets you specify any policy.
There was a problem hiding this comment.
I think I'd prefer to drop it now. It can be added back if it makes sense down the road.
There was a problem hiding this comment.
Huh, it won't build without this being there. I've put the TODO in.
| .state | ||
| .picker | ||
| .pick(&crate::core::RequestHeaders::default()) |
There was a problem hiding this comment.
We should not be making picks for fake RPCs. If we can't map via the child itself, then we'll need another way to retrieve its active subchannel.
There was a problem hiding this comment.
I think this has been addressed.
| // Clean up stale counters | ||
| self.subchannel_counters | ||
| .retain(|weak, _| weak.upgrade().is_some()); |
There was a problem hiding this comment.
I think this would be better if it reused the known-active subchannels that we got above instead of keeping all the ones that happen to still upgrade.
There was a problem hiding this comment.
I think I've addressed this.
| } | ||
| let aggregate_state = self.child_manager.aggregate_states(); | ||
|
|
||
| if aggregate_state == ConnectivityState::Ready { |
There was a problem hiding this comment.
Optional: maybe let picker = if ... and send the update commonly between the if/else.
There was a problem hiding this comment.
The logic around this was changed, updated behavior looks reasonable to me based on other feedback.
| let picker = self | ||
| .child_manager | ||
| .children() | ||
| .find(|cs| cs.state.connectivity_state == aggregate_state) | ||
| .map(|cs| cs.state.picker.clone()) | ||
| .unwrap_or_else(|| { | ||
| Arc::new(crate::client::load_balancing::QueuingPicker) as Arc<dyn Picker> | ||
| }); |
There was a problem hiding this comment.
What picker is coming out of this? Is it just the first child whose state is the same as the aggregate state? We should round-robin over all of those children that match instead. You should ideally share the existing RoundRobin picker for this.
There was a problem hiding this comment.
This has been updated to handle an empty state, a case where there's only 1, and a case where we use the existing RoundRobin picker. It could possibly be simplified further to drop the case where there's only 1.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a new "least request" load balancing policy (least_request.rs) to the gRPC client, integrates it into the channel controller and service configuration, and adds comprehensive unit and integration tests. The code review feedback identifies several critical improvement opportunities: a bug where configuration updates (such as choice_count) are not propagated to the active picker because update_picker returns early when child states are unchanged; an issue where parent attributes are discarded when sharding resolver updates for child policies; and a recommendation to sample distinct subchannels (without replacement) to maximize the effectiveness of the "power of two choices" algorithm.
Implements the "all weights equal" Least Request Load Balancing policy in
gRPC-Rust, in compliance with gRFC A48. The Least Request policy improves tail
latencies in heterogeneous environments by tracking active request counts per
endpoint and directing new requests to the backend with the lowest load.
Detailed Changes:
1. Core Load Balancing Policy (`least_request.rs`):
- Defined `LeastRequestLoadBalancingConfig` to parse and validate the
`choiceCount` config parameter (default = 2, clamped from 2 to 10).
- Implemented `LeastRequestBuilder` registering policy name
`least_request_experimental`.
- Implemented `LeastRequestPolicy` managing endpoint-level connections
via `ChildManager` children delegating to `pick_first`.
- Maintained a persistent mapping of weak subchannel references to active
request counters (`subchannel_counters`) so that outstanding request
metrics survive picker updates and name re-resolutions.
- Implemented `LeastRequestPicker` utilizing a random sampling selection
algorithm over `choice_count` subchannels.
2. Active Request Cancellation Safety:
- Identified and resolved a request counter leak bug where async task
cancellations during `dyn_invoke.await` dropped the `Pick` closure
without calling it.
- Implemented a custom, defusable `ActiveRequestGuard` using an `AtomicBool`
inside `LeastRequestPicker::pick`. The guard guarantees that the active
request count is decremented upon drop if the picker's `on_complete`
callback is never invoked.
3. Channel & Service Config Integration:
- Registered the builder with the global LB registry in `Channel::new`
inside `channel.rs`.
- Added `CallbackRecvStream` wrapping the stream in the channel's
`Invoke` implementation to trigger `on_complete` callbacks when client
streams are completed or dropped.
- Added `LeastRequest` variant to `LbPolicyType` enum in `service_config.rs`.
- Mapped `LbPolicyType::LeastRequest` configuration inside
`ResolverChannelController::update` in `channel.rs`.
4. Test Additions & Verification:
- Added comprehensive unit tests in `least_request.rs` covering configuration
parsing/clamping/validation, least request selection, tie-breaking,
fewer subchannels than choice count, and cancellation drop-guard safety.
- Modified the `InMemoryResolver` in `inmemory/mod.rs` to dynamically set
the `LeastRequest` load-balancing policy based on target URI path prefixes.
- Wrote a robust E2E integration test `test_in_memory_least_request_load_balancing`
in `inmemory/mod.rs` verifying dynamic load balancing across multiple
in-memory backends concurrently.
7a5c94e to
c16927a
Compare
…feat-least-request
…feat-least-request
…feat-least-request
|
Ok, I believe all of the comments have been addressed, and there aren't any more merge conflicts. However, testing is currently failing for some apparently unrelated reason, it's complaining about TLS, which I don't have in my PR. There also appear to be some CI failures around unresolved dependencies which are also unrelated to my PR. |
| impl<T> Child<T> { | ||
| pub fn subchannels(&self) -> impl Iterator<Item = Arc<dyn Subchannel>> + '_ { | ||
| self.subchannels.iter().filter_map(|weak| weak.upgrade()) | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Sorry, I should have caught this sooner.
Please see the updates to the least-request gRFC in A61:
https://github.com/grpc/proposal/blob/master/A61-IPv4-IPv6-dualstack-backends.md#wrr-in-javago (and the subsequent section about least-request specifically)
We should not be looking at the subchannels of the children at all. We should store the request counts for the endpoint instead.
| if state.active.swap(false, Ordering::Relaxed) { | ||
| state.counter.fetch_sub(1, Ordering::Relaxed); | ||
| } |
There was a problem hiding this comment.
This is redundant logic. The drop for state already does this. So you can just do either:
| if state.active.swap(false, Ordering::Relaxed) { | |
| state.counter.fetch_sub(1, Ordering::Relaxed); | |
| } | |
| let _ = state; |
Which captures state in the closure and its drop will be run automatically when the closure finishes.
Or:
| if state.active.swap(false, Ordering::Relaxed) { | |
| state.counter.fetch_sub(1, Ordering::Relaxed); | |
| } | |
| drop(state); |
Implements the "all weights equal" Least Request Load Balancing policy in gRPC-Rust, in compliance with gRFC A48. The Least Request policy improves tail latencies in heterogeneous environments by tracking active request counts per endpoint and directing new requests to the backend with the lowest load.
Detailed Changes:
Core Load Balancing Policy (
least_request.rs):LeastRequestLoadBalancingConfigto parse and validate thechoiceCountconfig parameter (default = 2, clamped from 2 to 10).LeastRequestBuilderregistering policy nameleast_request_experimental.LeastRequestPolicymanaging endpoint-level connections viaChildManagerchildren delegating topick_first.subchannel_counters) so that outstanding request metrics survive picker updates and name re-resolutions.LeastRequestPickerutilizing a random sampling selection algorithm overchoice_countsubchannels.Active Request Cancellation Safety:
dyn_invoke.awaitdropped thePickclosure without calling it.ActiveRequestGuardusing anAtomicBoolinsideLeastRequestPicker::pick. The guard guarantees that the active request count is decremented upon drop if the picker'son_completecallback is never invoked.Channel & Service Config Integration:
Channel::newinsidechannel.rs.CallbackRecvStreamwrapping the stream in the channel'sInvokeimplementation to triggeron_completecallbacks when client streams are completed or dropped.LeastRequestvariant toLbPolicyTypeenum inservice_config.rs.LbPolicyType::LeastRequestconfiguration insideResolverChannelController::updateinchannel.rs.Test Additions & Verification:
least_request.rscovering configuration parsing/clamping/validation, least request selection, tie-breaking, fewer subchannels than choice count, and cancellation drop-guard safety.InMemoryResolverininmemory/mod.rsto dynamically set theLeastRequestload-balancing policy based on target URI path prefixes.test_in_memory_least_request_load_balancingininmemory/mod.rsverifying dynamic load balancing across multiple in-memory backends concurrently.Motivation
Solution