From 69c14d27f3b9fd4d0ab5b7ba6b0741fbadccd2d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 29 Nov 2024 16:26:49 +0100 Subject: [PATCH] Document locator connection count setting --- src/docs/asciidoc/api.adoc | 10 ++++++- .../rabbitmq/stream/EnvironmentBuilder.java | 30 +++++++++++++++++-- .../stream/impl/StreamEnvironment.java | 7 ++++- .../stream/impl/StreamEnvironmentBuilder.java | 9 +++--- .../stream/docs/EnvironmentUsage.java | 1 + .../stream/impl/RecoveryClusterTest.java | 6 ++-- 6 files changed, 51 insertions(+), 12 deletions(-) diff --git a/src/docs/asciidoc/api.adoc b/src/docs/asciidoc/api.adoc index 30af7553cd..c075f91453 100644 --- a/src/docs/asciidoc/api.adoc +++ b/src/docs/asciidoc/api.adoc @@ -234,6 +234,10 @@ Used as a prefix for connection names. |Contract to change resolved node address to connect to. |Pass-through (no-op) +|`locatorConnectionCount` +|Number of locator connections to maintain (for metadata search) +|The smaller of the number of URIs and 3. + |`tls` |Configuration helper for TLS. |TLS is enabled if a `rabbitmq-stream+tls` URI is provided. @@ -293,8 +297,12 @@ include::{test-examples}/EnvironmentUsage.java[tag=address-resolver] <1> Set the load balancer address <2> Use load balancer address for initial connection <3> Ignore metadata hints, always use load balancer +<4> Set the number of locator connections to maintain -The blog post covers the https://www.rabbitmq.com/blog/2021/07/23/connecting-to-streams/#client-workaround-with-a-load-balancer[underlying details of this workaround]. +Note the example above sets the number of locator connections the environment maintains. +Locator connections are used to perform infrastructure-related operations (e.g. looking up the topology of a stream to find an appropriate node to connect to). +The environment uses the number of passed-in URIs to choose an appropriate default number and will pick 1 in this case, which may be too low for a cluster deployment. +This is why it is recommended to set the value explicitly, 3 being a good default. ==== Managing Streams diff --git a/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java b/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java index 64010d4d51..c5c5d3b624 100644 --- a/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java +++ b/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java @@ -16,6 +16,7 @@ import com.rabbitmq.stream.compression.Compression; import com.rabbitmq.stream.compression.CompressionCodecFactory; +import com.rabbitmq.stream.impl.StreamEnvironmentBuilder; import com.rabbitmq.stream.metrics.MetricsCollector; import com.rabbitmq.stream.sasl.CredentialsProvider; import com.rabbitmq.stream.sasl.SaslConfiguration; @@ -62,14 +63,15 @@ public interface EnvironmentBuilder { * An {@link AddressResolver} to potentially change resolved node address to connect to. * *

Applications can use this abstraction to make sure connection attempts ignore metadata hints - * and always go to a single point like a load balancer. + * and always go to a single point like a load balancer. Consider setting {@link + * #locatorConnectionCount(int)} when using a load balancer. * *

The default implementation does not perform any logic, it just returns the passed-in * address. * *

The default implementation is overridden automatically if the following conditions are * met: the host to connect to is localhost, the user is guest, and no - * address resolver has been provided. The client will then always tries to connect to + * address resolver has been provided. The client will then always try to connect to * localhost to facilitate local development. Just provide a pass-through address resolver * to avoid this behavior, e.g.: * @@ -79,10 +81,11 @@ public interface EnvironmentBuilder { * .build(); * * - * @param addressResolver + * @param addressResolver the address resolver * @return this builder instance * @see "Connecting to * Streams" blog post + * @see #locatorConnectionCount(int) */ EnvironmentBuilder addressResolver(AddressResolver addressResolver); @@ -395,6 +398,27 @@ EnvironmentBuilder topologyUpdateBackOffDelayPolicy( */ EnvironmentBuilder forceLeaderForProducers(boolean forceLeader); + /** + * Set the expected number of "locator" connections to maintain. + * + *

Locator connections are used to perform infrastructure-related operations (e.g. looking up + * the topology of a stream to find an appropriate node to connect to). + * + *

It is recommended to maintain 2 to 3 locator connections. The environment uses the smaller + * of the number of passed-in URIs and 3 by default (see {@link #uris(List)}). + * + *

The number of locator connections should be explicitly set when a load balancer is used, as + * the environment cannot know the number of cluster nodes in this case (the only URI set is the + * one of the load balancer). + * + * @param locatorConnectionCount number of expected locator connections + * @return this builder instance + * @see #uris(List) + * @see #addressResolver(AddressResolver) + * @since 0.21.0 + */ + StreamEnvironmentBuilder locatorConnectionCount(int locatorConnectionCount); + /** * Create the {@link Environment} instance. * diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java index 53df31709a..c817b1c33d 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java @@ -182,7 +182,12 @@ class StreamEnvironment implements Environment { this.addressResolver = addressResolverToUse; - int locatorCount = Math.max(this.addresses.size(), expectedLocatorCount); + int locatorCount; + if (expectedLocatorCount > 0) { + locatorCount = expectedLocatorCount; + } else { + locatorCount = Math.min(this.addresses.size(), 3); + } LOGGER.debug("Using {} locator connection(s)", locatorCount); List lctrs = diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java index f17446e985..0386ec79cd 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java @@ -70,7 +70,7 @@ public class StreamEnvironmentBuilder implements EnvironmentBuilder { private ObservationCollector observationCollector = ObservationCollector.NO_OP; private Duration producerNodeRetryDelay = Duration.ofMillis(500); private Duration consumerNodeRetryDelay = Duration.ofMillis(1000); - private int locatorCount = 1; + private int locatorConnectionCount = -1; public StreamEnvironmentBuilder() {} @@ -316,8 +316,9 @@ StreamEnvironmentBuilder consumerNodeRetryDelay(Duration consumerNodeRetryDelay) return this; } - StreamEnvironmentBuilder locatorCount(int locatorCount) { - this.locatorCount = locatorCount; + @Override + public StreamEnvironmentBuilder locatorConnectionCount(int locatorCount) { + this.locatorConnectionCount = locatorCount; return this; } @@ -356,7 +357,7 @@ public Environment build() { this.forceLeaderForProducers, this.producerNodeRetryDelay, this.consumerNodeRetryDelay, - this.locatorCount); + this.locatorConnectionCount); } static final class DefaultTlsConfiguration implements TlsConfiguration { diff --git a/src/test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java b/src/test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java index 72a32daa10..d42e6662e4 100644 --- a/src/test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java +++ b/src/test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java @@ -97,6 +97,7 @@ void addressResolver() throws Exception { .host(entryPoint.host()) // <2> .port(entryPoint.port()) // <2> .addressResolver(address -> entryPoint) // <3> + .locatorConnectionCount(3) // <4> .build(); // end::address-resolver[] } diff --git a/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java b/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java index b6476670fd..da098188c5 100644 --- a/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java @@ -112,10 +112,10 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru environmentBuilder .host(LOAD_BALANCER_ADDRESS.host()) .port(LOAD_BALANCER_ADDRESS.port()) - .addressResolver(addr -> LOAD_BALANCER_ADDRESS); + .addressResolver(addr -> LOAD_BALANCER_ADDRESS) + .forceLeaderForProducers(forceLeader) + .locatorConnectionCount(URIS.size()); Duration nodeRetryDelay = Duration.ofMillis(100); - environmentBuilder.forceLeaderForProducers(forceLeader); - ((StreamEnvironmentBuilder) environmentBuilder).locatorCount(URIS.size()); // to make the test faster ((StreamEnvironmentBuilder) environmentBuilder).producerNodeRetryDelay(nodeRetryDelay); ((StreamEnvironmentBuilder) environmentBuilder).consumerNodeRetryDelay(nodeRetryDelay);