Skip to content

Commit

Permalink
JAVA-2914: Transform node filter into a more flexible node distance e…
Browse files Browse the repository at this point in the history
…valuator (apache#1524)
  • Loading branch information
adutra authored Mar 22, 2021
1 parent 02da738 commit 130fd25
Show file tree
Hide file tree
Showing 24 changed files with 517 additions and 198 deletions.
1 change: 1 addition & 0 deletions changelog/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### 4.11.0 (in progress)

- [improvement] JAVA-2914: Transform node filter into a more flexible node distance evaluator
- [improvement] JAVA-2929: Revisit node-level metric eviction
- [new feature] JAVA-2830: Add mapper support for Java streams
- [bug] JAVA-2928: Generate counter increment/decrement constructs compatible with legacy C*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,12 @@ private LoadBalancingInfo getLoadBalancingInfo(DriverExecutionProfile driverExec
"localDataCenter",
driverExecutionProfile.getString(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER));
}
options.put(
"filterFunction",
driverExecutionProfile.isDefined(DefaultDriverOption.LOAD_BALANCING_FILTER_CLASS));
@SuppressWarnings("deprecation")
boolean hasNodeFiltering =
driverExecutionProfile.isDefined(DefaultDriverOption.LOAD_BALANCING_FILTER_CLASS)
|| driverExecutionProfile.isDefined(
DefaultDriverOption.LOAD_BALANCING_DISTANCE_EVALUATOR_CLASS);
options.put("filterFunction", hasNodeFiltering);
ClassSettingDetails loadBalancingDetails =
PackageUtil.getLoadBalancingDetails(
driverExecutionProfile.getString(DefaultDriverOption.LOAD_BALANCING_POLICY_CLASS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,10 @@ public enum DefaultDriverOption implements DriverOption {
* A custom filter to include/exclude nodes.
*
* <p>Value-Type: {@link String}
*
* @deprecated use {@link #LOAD_BALANCING_DISTANCE_EVALUATOR_CLASS} instead.
*/
@Deprecated
LOAD_BALANCING_FILTER_CLASS("basic.load-balancing-policy.filter.class"),

/**
Expand Down Expand Up @@ -852,6 +855,14 @@ public enum DefaultDriverOption implements DriverOption {
* <p>Value-type: {@link String}
*/
METRICS_ID_GENERATOR_PREFIX("advanced.metrics.id-generator.prefix"),

/**
* The class name of a custom {@link
* com.datastax.oss.driver.api.core.loadbalancing.NodeDistanceEvaluator}.
*
* <p>Value-Type: {@link String}
*/
LOAD_BALANCING_DISTANCE_EVALUATOR_CLASS("basic.load-balancing-policy.evaluator.class"),
;

private final String path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,21 @@ public String toString() {
public static final TypedDriverOption<String> LOAD_BALANCING_LOCAL_DATACENTER =
new TypedDriverOption<>(
DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER, GenericType.STRING);
/** A custom filter to include/exclude nodes. */
/**
* A custom filter to include/exclude nodes.
*
* @deprecated Use {@link #LOAD_BALANCING_DISTANCE_EVALUATOR_CLASS} instead.
*/
@Deprecated
public static final TypedDriverOption<String> LOAD_BALANCING_FILTER_CLASS =
new TypedDriverOption<>(DefaultDriverOption.LOAD_BALANCING_FILTER_CLASS, GenericType.STRING);
/**
* The class name of a custom {@link
* com.datastax.oss.driver.api.core.loadbalancing.NodeDistanceEvaluator}.
*/
public static final TypedDriverOption<String> LOAD_BALANCING_DISTANCE_EVALUATOR_CLASS =
new TypedDriverOption<>(
DefaultDriverOption.LOAD_BALANCING_DISTANCE_EVALUATOR_CLASS, GenericType.STRING);
/** The timeout to use for internal queries that run as part of the initialization process. */
public static final TypedDriverOption<Duration> CONNECTION_INIT_QUERY_TIMEOUT =
new TypedDriverOption<>(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.driver.api.core.loadbalancing;

import com.datastax.oss.driver.api.core.metadata.Node;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;

/**
* A pluggable {@link NodeDistance} evaluator.
*
* <p>Node distance evaluators are recognized by all the driver built-in load balancing policies.
* They can be specified {@linkplain
* com.datastax.oss.driver.api.core.session.SessionBuilder#withNodeDistanceEvaluator(String,
* NodeDistanceEvaluator) programmatically} or through the configuration (with the {@code
* load-balancing-policy.evaluator.class} option).
*
* @see com.datastax.oss.driver.api.core.session.SessionBuilder#withNodeDistanceEvaluator(String,
* NodeDistanceEvaluator)
*/
@FunctionalInterface
public interface NodeDistanceEvaluator {

/**
* Evaluates the distance to apply to the given node.
*
* <p>This method will be invoked each time the {@link LoadBalancingPolicy} processes a topology
* or state change, and will be passed the node being inspected, and the local datacenter name (or
* null if none is defined). If it returns a non-null {@link NodeDistance}, the policy will
* suggest that distance for the node; if it returns null, the policy will assign a default
* distance instead, based on its internal algorithm for computing node distances.
*
* @param node The node to assign a new distance to.
* @param localDc The local datacenter name, if defined, or null otherwise.
* @return The {@link NodeDistance} to assign to the node, or null to let the policy decide.
*/
@Nullable
NodeDistance evaluateDistance(@NonNull Node node, @Nullable String localDc);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,23 @@
package com.datastax.oss.driver.api.core.session;

import com.datastax.oss.driver.api.core.auth.AuthProvider;
import com.datastax.oss.driver.api.core.loadbalancing.NodeDistanceEvaluator;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.NodeStateListener;
import com.datastax.oss.driver.api.core.metadata.schema.SchemaChangeListener;
import com.datastax.oss.driver.api.core.ssl.SslEngineFactory;
import com.datastax.oss.driver.api.core.tracker.RequestTracker;
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
import com.datastax.oss.driver.api.core.type.codec.registry.MutableCodecRegistry;
import com.datastax.oss.driver.internal.core.loadbalancing.helper.NodeFilterToDistanceEvaluatorAdapter;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import java.util.function.Predicate;

Expand All @@ -52,6 +55,7 @@ public static Builder builder() {
private final RequestTracker requestTracker;
private final Map<String, String> localDatacenters;
private final Map<String, Predicate<Node>> nodeFilters;
private final Map<String, NodeDistanceEvaluator> nodeDistanceEvaluators;
private final ClassLoader classLoader;
private final AuthProvider authProvider;
private final SslEngineFactory sslEngineFactory;
Expand All @@ -69,6 +73,7 @@ private ProgrammaticArguments(
@Nullable RequestTracker requestTracker,
@NonNull Map<String, String> localDatacenters,
@NonNull Map<String, Predicate<Node>> nodeFilters,
@NonNull Map<String, NodeDistanceEvaluator> nodeDistanceEvaluators,
@Nullable ClassLoader classLoader,
@Nullable AuthProvider authProvider,
@Nullable SslEngineFactory sslEngineFactory,
Expand All @@ -85,6 +90,7 @@ private ProgrammaticArguments(
this.requestTracker = requestTracker;
this.localDatacenters = localDatacenters;
this.nodeFilters = nodeFilters;
this.nodeDistanceEvaluators = nodeDistanceEvaluators;
this.classLoader = classLoader;
this.authProvider = authProvider;
this.sslEngineFactory = sslEngineFactory;
Expand Down Expand Up @@ -122,10 +128,17 @@ public Map<String, String> getLocalDatacenters() {
}

@NonNull
@Deprecated
@SuppressWarnings("DeprecatedIsStillUsed")
public Map<String, Predicate<Node>> getNodeFilters() {
return nodeFilters;
}

@NonNull
public Map<String, NodeDistanceEvaluator> getNodeDistanceEvaluators() {
return nodeDistanceEvaluators;
}

@Nullable
public ClassLoader getClassLoader() {
return classLoader;
Expand Down Expand Up @@ -180,6 +193,8 @@ public static class Builder {
private ImmutableMap.Builder<String, String> localDatacentersBuilder = ImmutableMap.builder();
private final ImmutableMap.Builder<String, Predicate<Node>> nodeFiltersBuilder =
ImmutableMap.builder();
private final ImmutableMap.Builder<String, NodeDistanceEvaluator>
nodeDistanceEvaluatorsBuilder = ImmutableMap.builder();
private ClassLoader classLoader;
private AuthProvider authProvider;
private SslEngineFactory sslEngineFactory;
Expand Down Expand Up @@ -236,16 +251,42 @@ public Builder withLocalDatacenters(Map<String, String> localDatacenters) {
}

@NonNull
public Builder withNodeDistanceEvaluator(
@NonNull String profileName, @NonNull NodeDistanceEvaluator nodeDistanceEvaluator) {
this.nodeDistanceEvaluatorsBuilder.put(profileName, nodeDistanceEvaluator);
return this;
}

@NonNull
public Builder withNodeDistanceEvaluators(
Map<String, NodeDistanceEvaluator> nodeDistanceReporters) {
for (Entry<String, NodeDistanceEvaluator> entry : nodeDistanceReporters.entrySet()) {
this.nodeDistanceEvaluatorsBuilder.put(entry.getKey(), entry.getValue());
}
return this;
}

/**
* @deprecated Use {@link #withNodeDistanceEvaluator(String, NodeDistanceEvaluator)} instead.
*/
@NonNull
@Deprecated
public Builder withNodeFilter(
@NonNull String profileName, @NonNull Predicate<Node> nodeFilter) {
this.nodeFiltersBuilder.put(profileName, nodeFilter);
this.nodeDistanceEvaluatorsBuilder.put(
profileName, new NodeFilterToDistanceEvaluatorAdapter(nodeFilter));
return this;
}

/** @deprecated Use {@link #withNodeDistanceEvaluators(Map)} instead. */
@NonNull
@Deprecated
public Builder withNodeFilters(Map<String, Predicate<Node>> nodeFilters) {
for (Map.Entry<String, Predicate<Node>> entry : nodeFilters.entrySet()) {
this.nodeFiltersBuilder.put(entry.getKey(), entry.getValue());
this.nodeDistanceEvaluatorsBuilder.put(
entry.getKey(), new NodeFilterToDistanceEvaluatorAdapter(entry.getValue()));
}
return this;
}
Expand Down Expand Up @@ -313,6 +354,7 @@ public ProgrammaticArguments build() {
requestTracker,
localDatacentersBuilder.build(),
nodeFiltersBuilder.build(),
nodeDistanceEvaluatorsBuilder.build(),
classLoader,
authProvider,
sslEngineFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.context.DriverContext;
import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy;
import com.datastax.oss.driver.api.core.loadbalancing.NodeDistanceEvaluator;
import com.datastax.oss.driver.api.core.metadata.EndPoint;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.NodeStateListener;
Expand Down Expand Up @@ -389,6 +390,36 @@ public SelfT withLocalDatacenter(@NonNull String localDatacenter) {
return withLocalDatacenter(DriverExecutionProfile.DEFAULT_NAME, localDatacenter);
}

/**
* Adds a custom {@link NodeDistanceEvaluator} for a particular execution profile. This assumes
* that you're also using a dedicated load balancing policy for that profile.
*
* <p>Node distance evaluators are honored by all the driver built-in load balancing policies. If
* you use a custom policy implementation however, you'll need to explicitly invoke the evaluator
* whenever appropriate.
*
* <p>If an evaluator is specified programmatically with this method, it overrides the
* configuration (that is, the {@code load-balancing-policy.evaluator.class} option will be
* ignored).
*
* @see #withNodeDistanceEvaluator(NodeDistanceEvaluator)
*/
@NonNull
public SelfT withNodeDistanceEvaluator(
@NonNull String profileName, @NonNull NodeDistanceEvaluator nodeDistanceEvaluator) {
this.programmaticArgumentsBuilder.withNodeDistanceEvaluator(profileName, nodeDistanceEvaluator);
return self;
}

/**
* Alias to {@link #withNodeDistanceEvaluator(String, NodeDistanceEvaluator)} for the default
* profile.
*/
@NonNull
public SelfT withNodeDistanceEvaluator(@NonNull NodeDistanceEvaluator nodeDistanceEvaluator) {
return withNodeDistanceEvaluator(DriverExecutionProfile.DEFAULT_NAME, nodeDistanceEvaluator);
}

/**
* Adds a custom filter to include/exclude nodes for a particular execution profile. This assumes
* that you're also using a dedicated load balancing policy for that profile.
Expand All @@ -398,21 +429,60 @@ public SelfT withLocalDatacenter(@NonNull String localDatacenter) {
* policy will suggest distance IGNORED (meaning the driver won't ever connect to it if all
* policies agree), and never included in any query plan.
*
* <p>Note that this behavior is implemented in the default load balancing policy. If you use a
* custom policy implementation, you'll need to explicitly invoke the filter.
* <p>Note that this behavior is implemented in the driver built-in load balancing policies. If
* you use a custom policy implementation, you'll need to explicitly invoke the filter.
*
* <p>If the filter is specified programmatically with this method, it overrides the configuration
* (that is, the {@code load-balancing-policy.filter.class} option will be ignored).
*
* <p><strong>This method has been deprecated in favor of {@link
* #withNodeDistanceEvaluator(String, NodeDistanceEvaluator)}</strong>. If you were using node
* filters, you can easily replace your filters with the following implementation of {@link
* NodeDistanceEvaluator}:
*
* <pre>{@code
* public class NodeFilterToDistanceEvaluatorAdapter implements NodeDistanceEvaluator {
*
* private final Predicate<Node> nodeFilter;
*
* public NodeFilterToDistanceEvaluatorAdapter(Predicate<Node> nodeFilter) {
* this.nodeFilter = nodeFilter;
* }
*
* public NodeDistance evaluateDistance(Node node, String localDc) {
* return nodeFilter.test(node) ? null : NodeDistance.IGNORED;
* }
* }
* }</pre>
*
* The same can be achieved using a lambda + closure:
*
* <pre>{@code
* Predicate<Node> nodeFilter = ...
* NodeDistanceEvaluator evaluator =
* (node, localDc) -> nodeFilter.test(node) ? null : NodeDistance.IGNORED;
* }</pre>
*
* @see #withNodeFilter(Predicate)
* @deprecated Use {@link #withNodeDistanceEvaluator(String, NodeDistanceEvaluator)} instead.
*/
@Deprecated
@NonNull
public SelfT withNodeFilter(@NonNull String profileName, @NonNull Predicate<Node> nodeFilter) {
this.programmaticArgumentsBuilder.withNodeFilter(profileName, nodeFilter);
return self;
}

/** Alias to {@link #withNodeFilter(String, Predicate)} for the default profile. */
/**
* Alias to {@link #withNodeFilter(String, Predicate)} for the default profile.
*
* <p><strong>This method has been deprecated in favor of {@link
* #withNodeDistanceEvaluator(NodeDistanceEvaluator)}</strong>. See the javadocs of {@link
* #withNodeFilter(String, Predicate)} to understand how to migrate your legacy node filters.
*
* @deprecated Use {@link #withNodeDistanceEvaluator(NodeDistanceEvaluator)} instead.
*/
@Deprecated
@NonNull
public SelfT withNodeFilter(@NonNull Predicate<Node> nodeFilter) {
return withNodeFilter(DriverExecutionProfile.DEFAULT_NAME, nodeFilter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.connection.ReconnectionPolicy;
import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy;
import com.datastax.oss.driver.api.core.loadbalancing.NodeDistanceEvaluator;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.NodeStateListener;
import com.datastax.oss.driver.api.core.metadata.schema.SchemaChangeListener;
Expand Down Expand Up @@ -223,7 +224,7 @@ public class DefaultDriverContext implements InternalDriverContext {
private final SchemaChangeListener schemaChangeListenerFromBuilder;
private final RequestTracker requestTrackerFromBuilder;
private final Map<String, String> localDatacentersFromBuilder;
private final Map<String, Predicate<Node>> nodeFiltersFromBuilder;
private final Map<String, NodeDistanceEvaluator> nodeDistanceEvaluatorsFromBuilder;
private final ClassLoader classLoader;
private final InetSocketAddress cloudProxyAddress;
private final LazyReference<RequestLogFormatter> requestLogFormatterRef =
Expand Down Expand Up @@ -275,7 +276,7 @@ public DefaultDriverContext(
"sslEngineFactory",
() -> buildSslEngineFactory(programmaticArguments.getSslEngineFactory()),
cycleDetector);
this.nodeFiltersFromBuilder = programmaticArguments.getNodeFilters();
this.nodeDistanceEvaluatorsFromBuilder = programmaticArguments.getNodeDistanceEvaluators();
this.classLoader = programmaticArguments.getClassLoader();
this.cloudProxyAddress = programmaticArguments.getCloudProxyAddress();
this.startupClientId = programmaticArguments.getStartupClientId();
Expand Down Expand Up @@ -908,8 +909,8 @@ public String getLocalDatacenter(@NonNull String profileName) {

@Nullable
@Override
public Predicate<Node> getNodeFilter(@NonNull String profileName) {
return nodeFiltersFromBuilder.get(profileName);
public NodeDistanceEvaluator getNodeDistanceEvaluator(@NonNull String profileName) {
return nodeDistanceEvaluatorsFromBuilder.get(profileName);
}

@Nullable
Expand Down
Loading

0 comments on commit 130fd25

Please sign in to comment.