Skip to content

Commit 0b68eff

Browse files
authored
core:Change address deduping to be across EAGs (#11345)
* Change dedup to be cross EAG, not just within an EAG
1 parent 9e21459 commit 0b68eff

File tree

4 files changed

+50
-24
lines changed

4 files changed

+50
-24
lines changed

core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,6 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
9191
handleNameResolutionError(unavailableStatus);
9292
return unavailableStatus;
9393
}
94-
95-
List<EquivalentAddressGroup> cleanServers = new ArrayList<>();
96-
9794
for (EquivalentAddressGroup eag : servers) {
9895
if (eag == null) {
9996
Status unavailableStatus = Status.UNAVAILABLE.withDescription(
@@ -103,12 +100,13 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
103100
handleNameResolutionError(unavailableStatus);
104101
return unavailableStatus;
105102
}
106-
cleanServers.add(removeDuplicateAddresses(eag));
107103
}
108104

109105
// Since we have a new set of addresses, we are again at first pass
110106
firstPass = true;
111107

108+
List<EquivalentAddressGroup> cleanServers = deDupAddresses(servers);
109+
112110
// We can optionally be configured to shuffle the address list. This can help better distribute
113111
// the load.
114112
if (resolvedAddresses.getLoadBalancingPolicyConfig()
@@ -121,7 +119,6 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
121119
}
122120
}
123121

124-
// Make sure we're storing our own list rather than what was passed in
125122
final ImmutableList<EquivalentAddressGroup> newImmutableAddressGroups =
126123
ImmutableList.<EquivalentAddressGroup>builder().addAll(cleanServers).build();
127124

@@ -181,21 +178,23 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
181178
return Status.OK;
182179
}
183180

184-
private static EquivalentAddressGroup removeDuplicateAddresses(EquivalentAddressGroup eag) {
185-
Set<SocketAddress> addressSet = new HashSet<>();
186-
ArrayList<SocketAddress> addrs = new ArrayList<>(); // maintains order
181+
private static List<EquivalentAddressGroup> deDupAddresses(List<EquivalentAddressGroup> groups) {
182+
Set<SocketAddress> seenAddresses = new HashSet<>();
183+
List<EquivalentAddressGroup> newGroups = new ArrayList<>();
187184

188-
for (SocketAddress address : eag.getAddresses()) {
189-
if (addressSet.add(address)) {
190-
addrs.add(address);
185+
for (EquivalentAddressGroup group : groups) {
186+
List<SocketAddress> addrs = new ArrayList<>();
187+
for (SocketAddress addr : group.getAddresses()) {
188+
if (seenAddresses.add(addr)) {
189+
addrs.add(addr);
190+
}
191+
}
192+
if (!addrs.isEmpty()) {
193+
newGroups.add(new EquivalentAddressGroup(addrs, group.getAttributes()));
191194
}
192195
}
193196

194-
if (addressSet.size() == eag.getAddresses().size()) {
195-
return eag;
196-
}
197-
198-
return new EquivalentAddressGroup(addrs, eag.getAttributes());
197+
return newGroups;
199198
}
200199

201200
@Override

core/src/main/java/io/grpc/internal/PickFirstLoadBalancerProvider.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,14 @@ public final class PickFirstLoadBalancerProvider extends LoadBalancerProvider {
3636
public static final String GRPC_PF_USE_HAPPY_EYEBALLS = "GRPC_PF_USE_HAPPY_EYEBALLS";
3737
private static final String SHUFFLE_ADDRESS_LIST_KEY = "shuffleAddressList";
3838

39-
static boolean enableNewPickFirst =
39+
private static boolean enableNewPickFirst =
4040
GrpcUtil.getFlag("GRPC_EXPERIMENTAL_ENABLE_NEW_PICK_FIRST", false);
4141

4242
public static boolean isEnabledHappyEyeballs() {
4343

4444
return GrpcUtil.getFlag(GRPC_PF_USE_HAPPY_EYEBALLS, false);
4545
}
4646

47-
@VisibleForTesting
48-
public static boolean isEnableNewPickFirst() {
49-
return enableNewPickFirst;
50-
}
51-
5247
@Override
5348
public boolean isAvailable() {
5449
return true;

core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public class AutoConfiguredLoadBalancerFactoryTest {
9999
new FakeLoadBalancerProvider("test_lb2", testLbBalancer2, nextParsedConfigOrError2)));
100100

101101
private final Class<? extends LoadBalancer> pfLbClass =
102-
PickFirstLoadBalancerProvider.enableNewPickFirst
102+
PickFirstLoadBalancerProvider.isEnabledNewPickFirst()
103103
? PickFirstLeafLoadBalancer.class
104104
: PickFirstLoadBalancer.class;
105105

core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -565,7 +565,7 @@ public void pickAfterResolutionAfterTransientValue() {
565565
}
566566

567567
@Test
568-
public void pickAWithDupAddressesUpDownUp() {
568+
public void pickWithDupAddressesUpDownUp() {
569569
InOrder inOrder = inOrder(mockHelper);
570570
SocketAddress socketAddress = servers.get(0).getAddresses().get(0);
571571
EquivalentAddressGroup badEag = new EquivalentAddressGroup(
@@ -599,6 +599,38 @@ public void pickAWithDupAddressesUpDownUp() {
599599
assertEquals(Status.OK, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());
600600
}
601601

602+
@Test
603+
public void pickWithDupEagsUpDownUp() {
604+
InOrder inOrder = inOrder(mockHelper);
605+
List<EquivalentAddressGroup> newServers = Lists.newArrayList(servers.get(0), servers.get(0));
606+
607+
loadBalancer.acceptResolvedAddresses(
608+
ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build());
609+
verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
610+
verify(mockHelper).createSubchannel(createArgsCaptor.capture());
611+
verify(mockSubchannel1).start(stateListenerCaptor.capture());
612+
SubchannelStateListener stateListener = stateListenerCaptor.getValue();
613+
614+
reset(mockHelper);
615+
616+
// An error has happened.
617+
Status error = Status.UNAVAILABLE.withDescription("boom!");
618+
stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error));
619+
inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
620+
inOrder.verify(mockHelper).refreshNameResolution();
621+
assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());
622+
623+
// Transition from TRANSIENT_ERROR to CONNECTING should also be ignored.
624+
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));
625+
verifyNoMoreInteractions(mockHelper);
626+
assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());
627+
628+
// Transition from CONNECTING to READY .
629+
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY));
630+
inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture());
631+
assertEquals(Status.OK, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());
632+
}
633+
602634
@Test
603635
public void nameResolutionError() {
604636
Status error = Status.NOT_FOUND.withDescription("nameResolutionError");

0 commit comments

Comments
 (0)