Skip to content

Commit

Permalink
minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
YifeiZhuang committed Nov 10, 2023
1 parent 1b71001 commit 507d772
Show file tree
Hide file tree
Showing 9 changed files with 20 additions and 31 deletions.
4 changes: 3 additions & 1 deletion api/src/main/java/io/grpc/LoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,10 @@ public abstract class LoadBalancer {
HEALTH_CONSUMER_LISTENER_ARG_KEY =
LoadBalancer.CreateSubchannelArgs.Key.create("internal:health-check-consumer-listener");

@Internal
public static final Attributes.Key<LoadBalancer.SubchannelStateListener>
HEALTH_PRODUCER_LISTENER_KEY = Attributes.Key.create("health-check-producer");
HEALTH_PRODUCER_LISTENER_KEY =
Attributes.Key.create("internal:health-check-producer-listener");

public static final SubchannelPicker EMPTY_PICKER = new SubchannelPicker() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static com.google.common.base.Preconditions.checkState;
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.SHUTDOWN;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,6 @@ public void typicalWorkflow() {
deliverSubchannelState(i, ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE));
deliverSubchannelState(i, ConnectivityStateInfo.forNonError(IDLE));

inOrder.verify(mockHealthListener).onSubchannelState(null);
inOrder.verify(mockStateListener).onSubchannelState(
eq(ConnectivityStateInfo.forNonError(CONNECTING)));
inOrder.verify(mockHealthListener).onSubchannelState(
Expand Down Expand Up @@ -401,8 +400,6 @@ public void healthCheckDisabledWhenServiceNotImplemented() {

InOrder inOrder = inOrder(mockStateListeners[0], mockStateListeners[1], mockHealthListeners[0],
mockHealthListeners[1]);
inOrder.verify(mockHealthListeners[0]).onSubchannelState(null);
inOrder.verify(mockHealthListeners[1]).onSubchannelState(null);

for (int i = 0; i < 2; i++) {
deliverSubchannelState(i, ConnectivityStateInfo.forNonError(READY));
Expand Down Expand Up @@ -482,7 +479,6 @@ public void backoffRetriesWhenServerErroneouslyClosesRpcBeforeAnyResponse() {
backoffPolicy1, backoffPolicy2);

deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
inOrder.verify(mockHealthListener).onSubchannelState(null);
inOrder.verify(mockListener).onSubchannelState(
eq(ConnectivityStateInfo.forNonError(READY)));
inOrder.verify(mockHealthListener).onSubchannelState(
Expand Down Expand Up @@ -559,7 +555,6 @@ public void serverRespondResetsBackoff() {
inOrder(mockStateListener, mockHealthListener,
backoffPolicyProvider, backoffPolicy1, backoffPolicy2);

inOrder.verify(mockHealthListener).onSubchannelState(null);
deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
inOrder.verify(mockStateListener).onSubchannelState(
eq(ConnectivityStateInfo.forNonError(READY)));
Expand Down Expand Up @@ -846,8 +841,7 @@ public void serviceConfigChangesServiceNameWhenRpcActive() {
assertThat(unwrap(subchannel)).isSameInstanceAs(subchannels[0]);
InOrder inOrder = inOrder(origLb, mockListener, mockHealthListener);

inOrder.verify(mockHealthListener).onSubchannelState(null);
deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
inOrder.verify(mockListener).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY)));
inOrder.verify(mockHealthListener).onSubchannelState(
eq(ConnectivityStateInfo.forNonError(CONNECTING)));
Expand Down Expand Up @@ -908,7 +902,6 @@ public void serviceConfigChangesServiceNameWhenRetryPending() {
SubchannelStateListener mockListener = mockStateListeners[0];
assertThat(unwrap(subchannel)).isSameInstanceAs(subchannels[0]);
InOrder inOrder = inOrder(origLb, mockListener, mockHealthListener);
inOrder.verify(mockHealthListener).onSubchannelState(null);

deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
inOrder.verify(mockListener).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY)));
Expand Down
10 changes: 6 additions & 4 deletions util/src/main/java/io/grpc/util/HealthProducerUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,13 @@ public Attributes getAttributes() {
/**
* Used by a health producer system to construct the subchannel health notification chain and
* notify aggregated health status with cached health status.
* A parent health producer system then will call {@link #onSubchannelState} to notify health status
* change, and {@link #thisHealthState} is used by the current child health producer.
* A parent health producer system then will call {@link #onSubchannelState} to notify health
* status change, and {@link #thisHealthState} is used by the current child health producer.
* Notification waits on both parent and child health status present. Users may reset health
* status to prevent stale health status to be consumed.
* */
public static final class HealthCheckProducerListener implements LoadBalancer.SubchannelStateListener {
public static final class HealthCheckProducerListener
implements LoadBalancer.SubchannelStateListener {

private ConnectivityStateInfo upperStreamHealthStatus = null;
private ConnectivityStateInfo thisHealthState = null;
Expand Down Expand Up @@ -152,12 +153,13 @@ private void notifyHealth() {
} else if (ConnectivityState.READY == upperStreamHealthStatus.getState()) {
next = thisHealthState;
} else if (ConnectivityState.TRANSIENT_FAILURE == upperStreamHealthStatus.getState()) {
log.log(Level.FINE, "todo: add producer name in description " + healthProducerName);
// todo: include producer name and upper stream health status in description
next = ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE.withDescription(
thisHealthState.getStatus().getDescription()));
}
if (!Objects.equal(concludedHealthStatus, next)) {
concludedHealthStatus = next;
log.log(Level.FINE, "Health producer " + healthProducerName + ":" + next);
delegate.onSubchannelState(next);
}
}
Expand Down
4 changes: 0 additions & 4 deletions util/src/main/java/io/grpc/util/MultiChildLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.grpc.ConnectivityState;
Expand Down Expand Up @@ -56,9 +55,6 @@
public abstract class MultiChildLoadBalancer extends LoadBalancer {

private static final Logger logger = Logger.getLogger(MultiChildLoadBalancer.class.getName());



private final Map<Object, ChildLbState> childLbStates = new LinkedHashMap<>();
private final Helper helper;
// Set to true if currently in the process of handling resolved addresses.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,9 @@ class OutlierDetectionSubchannel extends ForwardingSubchannel {
private AddressTracker addressTracker;
private boolean ejected;
private ConnectivityStateInfo lastSubchannelState;

/**
* Only one of subchannelStatelistener or healthListener exists. Backward compatibility guaranteed.
* Only one of subchannelStatelistener or healthListener exists. Backward compatible.
* Before generic health check: subchannelStatelistener is nonnull and is used for state
* notification, considering health.
* After generic health check: healthListener is nonnull. Raw connectivity state is
Expand All @@ -261,7 +262,7 @@ class OutlierDetectionSubchannel extends ForwardingSubchannel {
}

OutlierDetectionSubchannel(Subchannel delegate,
@Nullable HealthProducerUtil.HealthCheckProducerListener hcListener) {
@Nullable HealthProducerUtil.HealthCheckProducerListener hcListener) {
this.delegate = delegate;
this.logger = delegate.getChannelLogger();
this.healthListener = hcListener;
Expand Down Expand Up @@ -359,9 +360,9 @@ void eject() {
void uneject() {
ejected = false;
if (healthListener != null) {
healthListener.thisHealthState(
ConnectivityStateInfo.forNonError(ConnectivityState.READY));
logger.log(ChannelLogLevel.INFO, "Subchannel unejected: {0}", this);
healthListener.thisHealthState(
ConnectivityStateInfo.forNonError(ConnectivityState.READY));
logger.log(ChannelLogLevel.INFO, "Subchannel unejected: {0}", this);
} else {
if (lastSubchannelState != null) {
subchannelStateListener.onSubchannelState(lastSubchannelState);
Expand Down
6 changes: 1 addition & 5 deletions util/src/test/java/io/grpc/util/HealthProducerUtilTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,27 @@

package io.grpc.util;

import static com.google.common.truth.Truth.assertThat;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.LoadBalancer.HEALTH_CONSUMER_LISTENER_ARG_KEY;
import static io.grpc.LoadBalancer.HEALTH_PRODUCER_LISTENER_KEY;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static com.google.common.truth.Truth.assertThat;

import io.grpc.Attributes;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.Status;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1297,7 +1297,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
assertThat(healthListeners.get(eag)).isNotNull();
HealthProducerUtil.HealthCheckProducerListener healthCheckProducerListener =
(HealthProducerUtil.HealthCheckProducerListener)
subchannel.getAttributes().get(HEALTH_PRODUCER_LISTENER_KEY);
subchannel.getAttributes().get(HEALTH_PRODUCER_LISTENER_KEY);
assertThat(healthCheckProducerListener).isNotNull();
healthCheckProducerListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY));
subchannelList.add(subchannel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,7 @@ private final class FakeLoadBalancer extends LoadBalancer {
}

@Override
public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
subchannelList = new ArrayList<>();
for (EquivalentAddressGroup eag: resolvedAddresses.getAddresses()) {
Subchannel subchannel = helper.createSubchannel(CreateSubchannelArgs.newBuilder()
Expand All @@ -1219,7 +1219,7 @@ public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
subchannel.start(mock(SubchannelStateListener.class));
deliverSubchannelState(READY);
}
return true;
return Status.OK;
}

@Override
Expand Down

0 comments on commit 507d772

Please sign in to comment.