Skip to content

Commit

Permalink
feat: apply grpc service config from consul (grpc-ecosystem#1045)
Browse files Browse the repository at this point in the history
  • Loading branch information
onyn committed Feb 2, 2024
1 parent a2e9520 commit dd71c00
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import static net.devh.boot.grpc.client.nameresolver.DiscoveryClientResolverFactory.DISCOVERY_INSTANCE_ID_KEY;
import static net.devh.boot.grpc.client.nameresolver.DiscoveryClientResolverFactory.DISCOVERY_SERVICE_NAME_KEY;
import static net.devh.boot.grpc.common.util.GrpcUtils.CLOUD_DISCOVERY_METADATA_PORT;
import static net.devh.boot.grpc.common.util.GrpcUtils.CLOUD_DISCOVERY_METADATA_SERVICE_CONFIG;

import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -58,13 +61,15 @@ public class DiscoveryClientNameResolver extends NameResolver {
@Deprecated
private static final String LEGACY_CLOUD_DISCOVERY_METADATA_PORT = "gRPC.port";
private static final List<ServiceInstance> KEEP_PREVIOUS = null;
private static final Gson GSON = new Gson();

private final String name;
private final DiscoveryClient client;
private final SynchronizationContext syncContext;
private final Consumer<DiscoveryClientNameResolver> shutdownHook;
private final SharedResourceHolder.Resource<Executor> executorResource;
private final boolean usingExecutorResource;
private final ServiceConfigParser serviceConfigParser;

// The field must be accessed from syncContext, although the methods on an Listener2 can be called
// from any thread.
Expand Down Expand Up @@ -93,6 +98,7 @@ public DiscoveryClientNameResolver(final String name, final DiscoveryClient clie
this.executor = args.getOffloadExecutor();
this.usingExecutorResource = this.executor == null;
this.executorResource = executorResource;
this.serviceConfigParser = args.getServiceConfigParser();
}

/**
Expand Down Expand Up @@ -187,6 +193,27 @@ protected int getGrpcPort(final ServiceInstance instance) {
}
}

/**
* Extracts the gRPC service config from the given service instances.
*
* @param instances The list of instances to extract the service config from.
* @return The gRPC service config or null.
*/
protected String getServiceConfig(final List<ServiceInstance> instances) {
for (final ServiceInstance inst : instances) {
final Map<String, String> metadata = inst.getMetadata();
if (metadata == null || metadata.isEmpty()) {
continue;
}
final String metaValue = metadata.get(CLOUD_DISCOVERY_METADATA_SERVICE_CONFIG);
if (metaValue != null && !metaValue.isEmpty()) {
return metaValue;
}
}
return null;
}


/**
* Gets the attributes from the service instance for later use in a load balancer. Can be overwritten to convert
* custom attributes.
Expand Down Expand Up @@ -314,11 +341,31 @@ private List<ServiceInstance> resolveInternal() {
return KEEP_PREVIOUS;
}

final ResolutionResult.Builder result = ResolutionResult.newBuilder()
.setAddresses(toTargets(newInstanceList));

final String serviceConfig = getServiceConfig(newInstanceList);
if (serviceConfig != null) {
log.debug("Found service config for {}", getName());
log.trace("Service config for {}: {}", getName(), serviceConfig);
try {
@SuppressWarnings("unchecked")
Map<String, ?> parsedServiceConfig = GSON.fromJson(serviceConfig, Map.class);
result.setServiceConfig(serviceConfigParser.parseServiceConfig(parsedServiceConfig));
} catch (JsonSyntaxException e) {
result.setServiceConfig(
ConfigOrError.fromError(
Status.UNKNOWN
.withDescription("Failed to parse grpc service config")
.withCause(e)
)
);
}
}

// Set new servers
log.debug("Ready to update server list for {}", getName());
this.savedListener.onResult(ResolutionResult.newBuilder()
.setAddresses(toTargets(newInstanceList))
.build());
this.savedListener.onResult(result.build());
log.info("Done updating server list for {}", getName());
return newInstanceList;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package net.devh.boot.grpc.client.nameresolver;

import static org.assertj.core.api.Assertions.assertThat;

import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.internal.AutoConfiguredLoadBalancerFactory;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ScParser;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import net.devh.boot.grpc.common.util.GrpcUtils;
import org.junit.jupiter.api.Test;
import org.springframework.cloud.client.DefaultServiceInstance;
import org.springframework.cloud.client.discovery.simple.SimpleDiscoveryClient;
import org.springframework.cloud.client.discovery.simple.SimpleDiscoveryProperties;

/**
* Test for {@link DiscoveryClientNameResolver}.
*/
public class DiscoveryClientNameResolverTest {

private final NameResolver.Args args = NameResolver.Args.newBuilder()
.setDefaultPort(1212)
.setProxyDetector(GrpcUtil.DEFAULT_PROXY_DETECTOR)
.setSynchronizationContext(
new SynchronizationContext((t, e) -> {
throw new AssertionError(e);
})
)
.setServiceConfigParser(
new ScParser(
true, 10, 10, new AutoConfiguredLoadBalancerFactory("pick_first")
)
)
.setOffloadExecutor(Runnable::run)
.build();

@Test
void testValidServiceConfig() {
var validServiceConfig = """
{
"loadBalancingConfig": [
{"round_robin": {}}
],
"methodConfig": [
{
"name": [{}],
"retryPolicy": {
"maxAttempts": 5,
"initialBackoff": "0.05s",
"maxBackoff": "1s",
"backoffMultiplier": 2,
"retryableStatusCodes": [
"UNAVAILABLE",
"ABORTED",
"DATA_LOSS",
"INTERNAL",
"DEADLINE_EXCEEDED"
]
},
"timeout": "5s"
}
]
}
""";
var listener = resolveServiceAndVerify("test1", validServiceConfig);
var serviceConf = listener.getResult().getServiceConfig();
assertThat(serviceConf).isNotNull();
assertThat(serviceConf.getConfig()).isNotNull();
assertThat(serviceConf.getError()).isNull();
}

@Test
void testBrokenServiceConfig() {
var listener = resolveServiceAndVerify("test2", "intentionally invalid service config");
var serviceConf = listener.getResult().getServiceConfig();
assertThat(serviceConf).isNotNull();
assertThat(serviceConf.getConfig()).isNull();
assertThat(serviceConf.getError()).extracting(Status::getCode).isEqualTo(Status.Code.UNKNOWN);
}

private TestableListener resolveServiceAndVerify(String serviceName, String serviceConfig) {
var props = new SimpleDiscoveryProperties();
var service = new DefaultServiceInstance(serviceName + "-1", serviceName, "127.0.0.1", 3322, false);
var meta = service.getMetadata();
meta.put(GrpcUtils.CLOUD_DISCOVERY_METADATA_PORT, "6688");
meta.put(GrpcUtils.CLOUD_DISCOVERY_METADATA_SERVICE_CONFIG, serviceConfig);
props.setInstances(Map.of(serviceName, List.of(service)));
var disco = new SimpleDiscoveryClient(props);
var dcnr = new DiscoveryClientNameResolver(serviceName, disco, args, null, null);

var listener = new TestableListener();
dcnr.start(listener);

assertThat(listener.isErrorWasSet()).isFalse();
assertThat(listener.isResultWasSet()).isTrue();
var addr = (InetSocketAddress) listener.getResult().getAddresses().get(0).getAddresses().get(0);
assertThat(addr.getPort()).isEqualTo(6688);
assertThat(addr.getHostString()).isEqualTo("127.0.0.1");
return listener;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package net.devh.boot.grpc.client.nameresolver;

import io.grpc.NameResolver;
import io.grpc.Status;
import lombok.Getter;

@Getter
public class TestableListener extends NameResolver.Listener2 {

private NameResolver.ResolutionResult result;

private Status error;

private boolean resultWasSet = false;

private boolean errorWasSet = false;

@Override
public void onResult(NameResolver.ResolutionResult resolutionResult) {
this.result = resolutionResult;
resultWasSet = true;
}

@Override
public void onError(Status error) {
this.error = error;
errorWasSet = true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ public final class GrpcUtils {
*/
public static final String CLOUD_DISCOVERY_METADATA_PORT = "gRPC_port";

/**
* The cloud discovery metadata key used to identify service config.
*/
public static final String CLOUD_DISCOVERY_METADATA_SERVICE_CONFIG = "gRPC_service_config";

/**
* The constant for the grpc server port, -1 represents don't start an inter process server.
*/
Expand Down

0 comments on commit dd71c00

Please sign in to comment.