From 9496334337f48b1ecaeb6ee60c766962153f7e7c Mon Sep 17 00:00:00 2001 From: Annie Liang <64233642+xinlian12@users.noreply.github.com> Date: Fri, 30 Oct 2020 19:59:21 +0000 Subject: [PATCH] ConnectionStateListener-improvement1 (#16647) * ConnectionStateListener Improvement Co-authored-by: Annie Liang --- .../AddressInformation.java | 6 ++ .../directconnectivity/AddressResolver.java | 12 +-- .../GatewayAddressCache.java | 67 +++++++++++--- .../GlobalAddressResolver.java | 34 ++++--- .../directconnectivity/IAddressCache.java | 6 +- .../directconnectivity/IAddressResolver.java | 7 +- .../RntbdTransportClient.java | 2 +- .../rntbd/RntbdConnectionStateListener.java | 70 +-------------- .../rntbd/RntbdEndpoint.java | 2 +- .../rntbd/RntbdServiceEndpoint.java | 26 +----- .../directconnectivity/rntbd/RntbdUtils.java | 22 +++++ .../GatewayAddressCacheTest.java | 88 ++++++++++++++++--- .../RntbdTransportClientTest.java | 2 +- 13 files changed, 209 insertions(+), 135 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/AddressInformation.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/AddressInformation.java index d3ac3b9140eb..e60e03e94f78 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/AddressInformation.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/AddressInformation.java @@ -4,7 +4,9 @@ package com.azure.cosmos.implementation.directconnectivity; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; +import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdUtils; +import java.net.URI; import java.util.Locale; import java.util.Objects; @@ -53,6 +55,10 @@ public String getProtocolScheme() { return this.protocol.scheme(); } + public URI getServerKey() { + return RntbdUtils.getServerKey(this.physicalUri.getURI()); + } + @Override public String toString() { return "AddressInformation{" + diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/AddressResolver.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/AddressResolver.java index 9577f1d3ff42..0a5e516617e4 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/AddressResolver.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/AddressResolver.java @@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; -import java.util.Set; +import java.net.URI; import java.util.concurrent.Callable; import java.util.function.Function; @@ -64,11 +64,6 @@ public void initializeCaches( this.collectionRoutingMapCache = collectionRoutingMapCache; } - @Override - public void remove(RxDocumentServiceRequest request, Set partitionKeyRangeIdentitySet) { - throw new NotImplementedException("remove() is not supported in AddressResolver"); - } - public Mono resolveAsync( RxDocumentServiceRequest request, boolean forceRefreshPartitionAddresses) { @@ -89,6 +84,11 @@ public Mono resolveAsync( }); } + @Override + public void updateAddresses(RxDocumentServiceRequest request, URI serverKey) { + throw new NotImplementedException("updateAddresses() is not supported in AddressResolver"); + } + private static boolean isSameCollection(PartitionKeyRange initiallyResolved, PartitionKeyRange newlyResolved) { if (initiallyResolved == null) { throw new IllegalArgumentException("parent"); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCache.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCache.java index 2cd4ff3e5ea6..84e7f956de91 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCache.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCache.java @@ -55,6 +55,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -84,6 +85,9 @@ public class GatewayAddressCache implements IAddressCache { private volatile Pair masterPartitionAddressCache; private volatile Instant suboptimalMasterPartitionTimestamp; + private final ConcurrentHashMap> serverPartitionAddressToPkRangeIdMap; + private final boolean tcpConnectionEndpointRediscoveryEnabled; + public GatewayAddressCache( DiagnosticsClientContext clientContext, URI serviceEndpoint, @@ -91,7 +95,8 @@ public GatewayAddressCache( IAuthorizationTokenProvider tokenProvider, UserAgentContainer userAgent, HttpClient httpClient, - long suboptimalPartitionForceRefreshIntervalInSeconds) { + long suboptimalPartitionForceRefreshIntervalInSeconds, + boolean tcpConnectionEndpointRediscoveryEnabled) { this.clientContext = clientContext; try { this.addressEndpoint = new URL(serviceEndpoint.toURL(), Paths.ADDRESS_PATH_SEGMENT).toURI(); @@ -124,6 +129,9 @@ public GatewayAddressCache( // Set requested API version header for version enforcement. defaultRequestHeaders.put(HttpConstants.HttpHeaders.VERSION, HttpConstants.Versions.CURRENT_VERSION); + + this.serverPartitionAddressToPkRangeIdMap = new ConcurrentHashMap<>(); + this.tcpConnectionEndpointRediscoveryEnabled = tcpConnectionEndpointRediscoveryEnabled; } public GatewayAddressCache( @@ -132,26 +140,40 @@ public GatewayAddressCache( Protocol protocol, IAuthorizationTokenProvider tokenProvider, UserAgentContainer userAgent, - HttpClient httpClient) { + HttpClient httpClient, + boolean tcpConnectionEndpointRediscoveryEnabled) { this(clientContext, serviceEndpoint, protocol, tokenProvider, userAgent, httpClient, - DefaultSuboptimalPartitionForceRefreshIntervalInSeconds); + DefaultSuboptimalPartitionForceRefreshIntervalInSeconds, + tcpConnectionEndpointRediscoveryEnabled); } @Override - public void removeAddress(final PartitionKeyRangeIdentity partitionKeyRangeIdentity) { + public void updateAddresses(final URI serverKey) { - Objects.requireNonNull(partitionKeyRangeIdentity, "expected non-null partitionKeyRangeIdentity"); + Objects.requireNonNull(serverKey, "expected non-null serverKey"); - if (partitionKeyRangeIdentity.getPartitionKeyRangeId().equals(PartitionKeyRange.MASTER_PARTITION_KEY_RANGE_ID)) { - this.masterPartitionAddressCache = null; + if (this.tcpConnectionEndpointRediscoveryEnabled) { + this.serverPartitionAddressToPkRangeIdMap.computeIfPresent(serverKey, (uri, partitionKeyRangeIdentitySet) -> { + + for (PartitionKeyRangeIdentity partitionKeyRangeIdentity : partitionKeyRangeIdentitySet) { + if (partitionKeyRangeIdentity.getPartitionKeyRangeId().equals(PartitionKeyRange.MASTER_PARTITION_KEY_RANGE_ID)) { + this.masterPartitionAddressCache = null; + } else { + this.serverPartitionAddressCache.remove(partitionKeyRangeIdentity); + } + } + + return null; + }); } else { - this.serverPartitionAddressCache.remove(partitionKeyRangeIdentity); + logger.warn("tcpConnectionEndpointRediscovery is not enabled, should not reach here."); } + } @Override @@ -622,14 +644,39 @@ public Mono> getMasterAddressesViaGatewayAsync( } private Pair toPartitionAddressAndRange(String collectionRid, List
addresses) { - logger.debug("toPartitionAddressAndRange"); + if (logger.isDebugEnabled()) { + logger.debug("toPartitionAddressAndRange"); + } + Address address = addresses.get(0); + PartitionKeyRangeIdentity partitionKeyRangeIdentity = new PartitionKeyRangeIdentity(collectionRid, address.getParitionKeyRangeId()); AddressInformation[] addressInfos = addresses.stream().map(addr -> GatewayAddressCache.toAddressInformation(addr) ).collect(Collectors.toList()).toArray(new AddressInformation[addresses.size()]); - return Pair.of(new PartitionKeyRangeIdentity(collectionRid, address.getParitionKeyRangeId()), addressInfos); + + if (this.tcpConnectionEndpointRediscoveryEnabled) { + for (AddressInformation addressInfo : addressInfos) { + if (logger.isDebugEnabled()) { + logger.debug( + "Added address to serverPartitionAddressToPkRangeIdMap: ({\"partitionKeyRangeIdentity\":{},\"address\":{}})", + partitionKeyRangeIdentity, + addressInfo); + } + + this.serverPartitionAddressToPkRangeIdMap.compute(addressInfo.getServerKey(), (serverKey, partitionKeyRangeIdentitySet) -> { + if (partitionKeyRangeIdentitySet == null) { + partitionKeyRangeIdentitySet = ConcurrentHashMap.newKeySet(); + } + + partitionKeyRangeIdentitySet.add(partitionKeyRangeIdentity); + return partitionKeyRangeIdentitySet; + }); + } + } + + return Pair.of(partitionKeyRangeIdentity, addressInfos); } private static AddressInformation toAddressInformation(Address address) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolver.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolver.java index 73ab1c998893..a15c8868c77b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolver.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolver.java @@ -17,6 +17,8 @@ import com.azure.cosmos.implementation.http.HttpClient; import com.azure.cosmos.implementation.routing.CollectionRoutingMap; import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.concurrent.Queues; @@ -28,11 +30,12 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; public class GlobalAddressResolver implements IAddressResolver { + private static final Logger logger = LoggerFactory.getLogger(GlobalAddressResolver.class); + private final static int MaxBackupReadRegions = 3; private final DiagnosticsClientContext diagnosticsClientContext; private final GlobalEndpointManager endpointManager; @@ -44,6 +47,7 @@ public class GlobalAddressResolver implements IAddressResolver { private final int maxEndpoints; private final GatewayServiceConfigurationReader serviceConfigReader; final Map addressCacheByEndpoint; + private final boolean tcpConnectionEndpointRediscoveryEnabled; private HttpClient httpClient; @@ -67,6 +71,7 @@ public GlobalAddressResolver( this.collectionCache = collectionCache; this.routingMapProvider = routingMapProvider; this.serviceConfigReader = serviceConfigReader; + this.tcpConnectionEndpointRediscoveryEnabled = connectionPolicy.isTcpConnectionEndpointRediscoveryEnabled(); int maxBackupReadEndpoints = (connectionPolicy.isReadRequestsFallbackEnabled()) ? GlobalAddressResolver.MaxBackupReadRegions : 0; this.maxEndpoints = maxBackupReadEndpoints + 2; // for write and alternate write getEndpoint (during failover) @@ -101,22 +106,22 @@ Mono openAsync(DocumentCollection collection) { } @Override - public void remove( - final RxDocumentServiceRequest request, - final Set partitionKeyRangeIdentitySet) { + public void updateAddresses(final RxDocumentServiceRequest request, final URI serverKey) { Objects.requireNonNull(request, "expected non-null request"); - Objects.requireNonNull(partitionKeyRangeIdentitySet, "expected non-null partitionKeyRangeIdentitySet"); - - if (partitionKeyRangeIdentitySet.size() > 0) { + Objects.requireNonNull(serverKey, "expected non-null serverKey"); - URI addressResolverURI = this.endpointManager.resolveServiceEndpoint(request); + if (this.tcpConnectionEndpointRediscoveryEnabled) { + URI serviceEndpoint = this.endpointManager.resolveServiceEndpoint(request); + this.addressCacheByEndpoint.computeIfPresent(serviceEndpoint, (ignored, endpointCache) -> { - this.addressCacheByEndpoint.computeIfPresent(addressResolverURI, (ignored, endpointCache) -> { final GatewayAddressCache addressCache = endpointCache.addressCache; - partitionKeyRangeIdentitySet.forEach(partitionKeyRangeIdentity -> addressCache.removeAddress(partitionKeyRangeIdentity)); + addressCache.updateAddresses(serverKey); + return endpointCache; }); + } else { + logger.warn("tcpConnectionEndpointRediscovery is not enabled, should not reach here."); } } @@ -139,7 +144,14 @@ private IAddressResolver getAddressResolver(RxDocumentServiceRequest rxDocumentS private EndpointCache getOrAddEndpoint(URI endpoint) { EndpointCache endpointCache = this.addressCacheByEndpoint.computeIfAbsent(endpoint , key -> { - GatewayAddressCache gatewayAddressCache = new GatewayAddressCache(this.diagnosticsClientContext, endpoint, protocol, this.tokenProvider, this.userAgentContainer, this.httpClient); + GatewayAddressCache gatewayAddressCache = new GatewayAddressCache( + this.diagnosticsClientContext, + endpoint, + protocol, + this.tokenProvider, + this.userAgentContainer, + this.httpClient, + this.tcpConnectionEndpointRediscoveryEnabled); AddressResolver addressResolver = new AddressResolver(); addressResolver.initializeCaches(this.collectionCache, this.routingMapProvider, gatewayAddressCache); EndpointCache cache = new EndpointCache(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/IAddressCache.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/IAddressCache.java index 0f5875812842..c424ee44cbc5 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/IAddressCache.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/IAddressCache.java @@ -8,14 +8,16 @@ import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity; import reactor.core.publisher.Mono; +import java.net.URI; + public interface IAddressCache { /** - * Removes the physical address associated with the given {@link PartitionKeyRangeIdentity partition key range identity} + * Update the physical address of the {@link PartitionKeyRangeIdentity partition key range identity} associated to the serverKey. * * */ - void removeAddress(PartitionKeyRangeIdentity partitionKeyRangeIdentity); + void updateAddresses(URI serverKey); /** * Resolves physical addresses by either PartitionKeyRangeIdentity. diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/IAddressResolver.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/IAddressResolver.java index 77d2ef31306f..656860e955d6 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/IAddressResolver.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/IAddressResolver.java @@ -4,16 +4,15 @@ package com.azure.cosmos.implementation.directconnectivity; import com.azure.cosmos.implementation.RxDocumentServiceRequest; -import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity; import reactor.core.publisher.Mono; -import java.util.Set; +import java.net.URI; public interface IAddressResolver { - void remove(RxDocumentServiceRequest request, Set partitionKeyRangeIdentitySet); - Mono resolveAsync( RxDocumentServiceRequest request, boolean forceRefreshPartitionAddresses); + + void updateAddresses(RxDocumentServiceRequest request, URI serverKey); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java index 8c2535c7726b..1b631a9437a3 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java @@ -704,7 +704,7 @@ public Builder(ConnectionPolicy connectionPolicy) { this.bufferPageSize = DEFAULT_OPTIONS.bufferPageSize; this.connectionAcquisitionTimeout = DEFAULT_OPTIONS.connectionAcquisitionTimeout; - this.connectionEndpointRediscoveryEnabled = DEFAULT_OPTIONS.connectionEndpointRediscoveryEnabled; + this.connectionEndpointRediscoveryEnabled = connectionPolicy.isTcpConnectionEndpointRediscoveryEnabled(); this.connectTimeout = connectionPolicy.getConnectTimeout(); this.idleChannelTimeout = connectionPolicy.getIdleTcpConnectionTimeout(); this.idleChannelTimerResolution = DEFAULT_OPTIONS.idleChannelTimerResolution; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConnectionStateListener.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConnectionStateListener.java index 942e351c4ebf..3eb0c29da3e4 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConnectionStateListener.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConnectionStateListener.java @@ -6,16 +6,12 @@ import com.azure.cosmos.implementation.GoneException; import com.azure.cosmos.implementation.RxDocumentServiceRequest; import com.azure.cosmos.implementation.directconnectivity.IAddressResolver; -import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.channels.ClosedChannelException; import java.time.Instant; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; @@ -26,8 +22,6 @@ public class RntbdConnectionStateListener { private final IAddressResolver addressResolver; private final RntbdEndpoint endpoint; - private final Set partitionAddressCache; - private final AtomicBoolean updatingAddressCache = new AtomicBoolean(false); // endregion @@ -36,7 +30,6 @@ public class RntbdConnectionStateListener { public RntbdConnectionStateListener(final IAddressResolver addressResolver, final RntbdEndpoint endpoint) { this.addressResolver = checkNotNull(addressResolver, "expected non-null addressResolver"); this.endpoint = checkNotNull(endpoint, "expected non-null endpoint"); - this.partitionAddressCache = ConcurrentHashMap.newKeySet(); } // endregion @@ -79,51 +72,13 @@ public void onException(final RxDocumentServiceRequest request, Throwable except } } - public void updateConnectionState(final RxDocumentServiceRequest request) { - - checkNotNull("expect non-null request"); - - PartitionKeyRangeIdentity partitionKeyRangeIdentity = this.getPartitionKeyRangeIdentity(request); - checkNotNull(partitionKeyRangeIdentity, "expected non-null partitionKeyRangeIdentity"); - - this.partitionAddressCache.add(partitionKeyRangeIdentity); - - if (logger.isDebugEnabled()) { - logger.debug( - "updateConnectionState({\"time\":{},\"endpoint\":{},\"partitionKeyRangeIdentity\":{}})", - RntbdObjectMapper.toJson(Instant.now()), - RntbdObjectMapper.toJson(endpoint), - RntbdObjectMapper.toJson(partitionKeyRangeIdentity)); - } - } - // endregion // region Privates - private PartitionKeyRangeIdentity getPartitionKeyRangeIdentity(final RxDocumentServiceRequest request) { - checkNotNull(request, "expect non-null request"); - - PartitionKeyRangeIdentity partitionKeyRangeIdentity = request.getPartitionKeyRangeIdentity(); - - if (partitionKeyRangeIdentity == null) { - - final String partitionKeyRange = checkNotNull( - request.requestContext.resolvedPartitionKeyRange, "expected non-null resolvedPartitionKeyRange").getId(); - - final String collectionRid = request.requestContext.resolvedCollectionRid; - - partitionKeyRangeIdentity = collectionRid != null - ? new PartitionKeyRangeIdentity(collectionRid, partitionKeyRange) - : new PartitionKeyRangeIdentity(partitionKeyRange); - } - - return partitionKeyRangeIdentity; - } - private void onConnectionEvent(final RntbdConnectionEvent event, final RxDocumentServiceRequest request, final Throwable exception) { - checkNotNull(request, "expected non-null exception"); + checkNotNull(request, "expected non-null request"); checkNotNull(exception, "expected non-null exception"); if (event == RntbdConnectionEvent.READ_EOF) { @@ -137,27 +92,10 @@ private void onConnectionEvent(final RntbdConnectionEvent event, final RxDocumen RntbdObjectMapper.toJson(exception)); } - this.updateAddressCache(request); - } - } - } - - private void updateAddressCache(final RxDocumentServiceRequest request) { - try{ - if (this.updatingAddressCache.compareAndSet(false, true)) { - if (logger.isDebugEnabled()) { - logger.debug( - "updateAddressCache ({\"time\":{},\"endpoint\":{},\"partitionAddressCache\":{}})", - RntbdObjectMapper.toJson(Instant.now()), - RntbdObjectMapper.toJson(this.endpoint), - RntbdObjectMapper.toJson(this.partitionAddressCache)); - } - - this.addressResolver.remove(request, this.partitionAddressCache); - this.partitionAddressCache.clear(); + this.addressResolver.updateAddresses(request, this.endpoint.serverKey()); + } else { + logger.warn("Endpoint closed while onConnectionEvent: {}", this.endpoint); } - } finally { - this.updatingAddressCache.set(false); } } // endregion diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdEndpoint.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdEndpoint.java index a1ecb71c4475..17645ce6a08e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdEndpoint.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdEndpoint.java @@ -59,7 +59,7 @@ public interface RntbdEndpoint extends AutoCloseable { SocketAddress remoteAddress(); - URI remoteURI(); + URI serverKey(); int requestQueueLength(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdServiceEndpoint.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdServiceEndpoint.java index e1a0685c313d..bcf48fdda31e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdServiceEndpoint.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdServiceEndpoint.java @@ -34,7 +34,6 @@ import java.io.IOException; import java.net.SocketAddress; import java.net.URI; -import java.net.URISyntaxException; import java.time.Duration; import java.time.Instant; import java.util.UUID; @@ -74,7 +73,7 @@ public final class RntbdServiceEndpoint implements RntbdEndpoint { private final Instant createdTime; private final RntbdMetrics metrics; private final Provider provider; - private final URI remoteURI; + private final URI serverKey; private final SocketAddress remoteAddress; private final RntbdRequestTimer requestTimer; private final Tag tag; @@ -93,20 +92,7 @@ private RntbdServiceEndpoint( final RntbdRequestTimer timer, final URI physicalAddress) { - try { - this.remoteURI = new URI( - physicalAddress.getScheme(), - null, - physicalAddress.getHost(), - physicalAddress.getPort(), - null, - null, - null); - } catch (URISyntaxException error) { - throw new IllegalArgumentException( - lenientFormat("physicalAddress %s cannot be parsed as a server-based authority", physicalAddress), - error); - } + this.serverKey = RntbdUtils.getServerKey(physicalAddress); final Bootstrap bootstrap = new Bootstrap() .channel(NioSocketChannel.class) @@ -116,7 +102,7 @@ private RntbdServiceEndpoint( .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.connectTimeoutInMillis()) .option(ChannelOption.RCVBUF_ALLOCATOR, receiveBufferAllocator) .option(ChannelOption.SO_KEEPALIVE, true) - .remoteAddress(this.remoteURI.getHost(), this.remoteURI.getPort()); + .remoteAddress(this.serverKey.getHost(), this.serverKey.getPort()); this.createdTime = Instant.now(); this.channelPool = new RntbdClientChannelPool(this, bootstrap, config); @@ -218,7 +204,7 @@ public SocketAddress remoteAddress() { } @Override - public URI remoteURI() { return this.remoteURI; } + public URI serverKey() { return this.serverKey; } @Override public int requestQueueLength() { @@ -275,10 +261,6 @@ public RntbdRequestRecord request(final RntbdRequestArgs args) { } } - if (this.connectionStateListener != null) { - this.connectionStateListener.updateConnectionState(args.serviceRequest()); - } - this.lastRequestNanoTime.set(args.nanoTimeCreated()); final RntbdRequestRecord record = this.write(args); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdUtils.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdUtils.java index 1c3f7bb84d41..9d46be9acaad 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdUtils.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdUtils.java @@ -9,6 +9,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.URI; +import java.net.URISyntaxException; + +import static com.azure.cosmos.implementation.guava27.Strings.lenientFormat; + public class RntbdUtils { private static final Logger logger = LoggerFactory.getLogger(RntbdUtils.class); @@ -32,4 +37,21 @@ public static int tryGetExecutorTaskQueueSize(EventExecutor eventLoop) { } return -1; } + + public static URI getServerKey(URI physicalAddress) { + try { + return new URI( + physicalAddress.getScheme(), + null, + physicalAddress.getHost(), + physicalAddress.getPort(), + null, + null, + null); + } catch (URISyntaxException error) { + throw new IllegalArgumentException( + lenientFormat("physicalAddress %s cannot be parsed as a server-based authority", physicalAddress), + error); + } + } } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCacheTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCacheTest.java index 2133ff642b74..5fafc24e090f 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCacheTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCacheTest.java @@ -101,7 +101,8 @@ public void getServerAddressesViaGateway(List partitionKeyRangeIds, protocol, authorizationTokenProvider, null, - getHttpClient(configs)); + getHttpClient(configs), + false); for (int i = 0; i < 2; i++) { RxDocumentServiceRequest req = RxDocumentServiceRequest.create(mockDiagnosticsClientContext(), OperationType.Create, ResourceType.Document, @@ -134,7 +135,8 @@ public void getMasterAddressesViaGatewayAsync(Protocol protocol) throws Exceptio protocol, authorizationTokenProvider, null, - getHttpClient(configs)); + getHttpClient(configs), + false); for (int i = 0; i < 2; i++) { RxDocumentServiceRequest req = RxDocumentServiceRequest.create(mockDiagnosticsClientContext(), OperationType.Create, ResourceType.Database, @@ -178,7 +180,8 @@ public void tryGetAddresses_ForDataPartitions(String partitionKeyRangeId, String protocol, authorizationTokenProvider, null, - getHttpClient(configs)); + getHttpClient(configs), + false); RxDocumentServiceRequest req = RxDocumentServiceRequest.create(mockDiagnosticsClientContext(), OperationType.Create, ResourceType.Document, @@ -201,6 +204,59 @@ public void tryGetAddresses_ForDataPartitions(String partitionKeyRangeId, String assertSameAs(addressInfosFromCache, expectedAddresses); } + @Test(groups = { "direct" }, dataProvider = "targetPartitionsKeyRangeAndCollectionLinkParams", timeOut = TIMEOUT) + public void tryGetAddress_OnConnectionEvent_Refresh(String partitionKeyRangeId, String collectionLink, Protocol protocol) throws Exception { + + Configs configs = ConfigsBuilder.instance().withProtocol(protocol).build(); + URI serviceEndpoint = new URI(TestConfigurations.HOST); + IAuthorizationTokenProvider authorizationTokenProvider = (RxDocumentClientImpl) client; + HttpClientUnderTestWrapper httpClientWrapper = getHttpClientUnderTestWrapper(configs); + + GatewayAddressCache cache = new GatewayAddressCache( + mockDiagnosticsClientContext(), + serviceEndpoint, + protocol, + authorizationTokenProvider, + null, + httpClientWrapper.getSpyHttpClient(), + true); + + RxDocumentServiceRequest req = + RxDocumentServiceRequest.create(mockDiagnosticsClientContext(), OperationType.Create, ResourceType.Document, + collectionLink, + new Database(), new HashMap<>()); + + PartitionKeyRangeIdentity partitionKeyRangeIdentity = new PartitionKeyRangeIdentity(createdCollection.getResourceId(), partitionKeyRangeId); + boolean forceRefreshPartitionAddresses = false; + + Mono> addressesInfosFromCacheObs = + cache.tryGetAddresses(req, partitionKeyRangeIdentity, forceRefreshPartitionAddresses); + + ArrayList addressInfosFromCache = + Lists.newArrayList(getSuccessResult(addressesInfosFromCacheObs, TIMEOUT).v); + + assertThat(httpClientWrapper.capturedRequests) + .describedAs("getAddress will read addresses from gateway") + .asList().hasSize(1); + + httpClientWrapper.capturedRequests.clear(); + + // for the second request with the same partitionkeyRangeIdentity, the address result should be fetched from the cache + getSuccessResult(cache.tryGetAddresses(req, partitionKeyRangeIdentity, forceRefreshPartitionAddresses), TIMEOUT); + assertThat(httpClientWrapper.capturedRequests) + .describedAs("getAddress should read from cache") + .asList().hasSize(0); + + httpClientWrapper.capturedRequests.clear(); + + // Now emulate onConnectionEvent happened, and the address should be removed from the cache + cache.updateAddresses(addressInfosFromCache.get(0).getServerKey()); + getSuccessResult(cache.tryGetAddresses(req, partitionKeyRangeIdentity, forceRefreshPartitionAddresses), TIMEOUT); + assertThat(httpClientWrapper.capturedRequests) + .describedAs("getAddress will read addresses from gateway after onConnectionEvent") + .asList().hasSize(1); + } + @DataProvider(name = "openAsyncTargetAndTargetPartitionsKeyRangeAndCollectionLinkParams") public Object[][] openAsyncTargetAndPartitionsKeyRangeTargetAndCollectionLinkParams() { return new Object[][] { @@ -228,7 +284,8 @@ public void tryGetAddresses_ForDataPartitions_AddressCachedByOpenAsync_NoHttpReq Protocol.HTTPS, authorizationTokenProvider, null, - httpClientWrapper.getSpyHttpClient()); + httpClientWrapper.getSpyHttpClient(), + false); String collectionRid = createdCollection.getResourceId(); @@ -284,7 +341,8 @@ public void tryGetAddresses_ForDataPartitions_ForceRefresh( Protocol.HTTPS, authorizationTokenProvider, null, - httpClientWrapper.getSpyHttpClient()); + httpClientWrapper.getSpyHttpClient(), + false); String collectionRid = createdCollection.getResourceId(); @@ -343,7 +401,8 @@ public void tryGetAddresses_ForDataPartitions_Suboptimal_Refresh( authorizationTokenProvider, null, httpClientWrapper.getSpyHttpClient(), - suboptimalRefreshTime); + suboptimalRefreshTime, + false); String collectionRid = createdCollection.getResourceId(); @@ -448,7 +507,8 @@ public void tryGetAddresses_ForMasterPartition(Protocol protocol) throws Excepti protocol, authorizationTokenProvider, null, - getHttpClient(configs)); + getHttpClient(configs), + false); RxDocumentServiceRequest req = RxDocumentServiceRequest.create(mockDiagnosticsClientContext(), OperationType.Create, ResourceType.Database, @@ -495,7 +555,8 @@ public void tryGetAddresses_ForMasterPartition_MasterPartitionAddressAlreadyCach authorizationTokenProvider, null, clientWrapper.getSpyHttpClient(), - suboptimalPartitionForceRefreshIntervalInSeconds); + suboptimalPartitionForceRefreshIntervalInSeconds, + false); RxDocumentServiceRequest req = RxDocumentServiceRequest.create(mockDiagnosticsClientContext(), OperationType.Create, ResourceType.Database, @@ -541,7 +602,8 @@ public void tryGetAddresses_ForMasterPartition_ForceRefresh() throws Exception { Protocol.HTTPS, authorizationTokenProvider, null, - clientWrapper.getSpyHttpClient()); + clientWrapper.getSpyHttpClient(), + false); RxDocumentServiceRequest req = RxDocumentServiceRequest.create(mockDiagnosticsClientContext(), OperationType.Create, ResourceType.Database, @@ -592,7 +654,9 @@ public void tryGetAddresses_SuboptimalMasterPartition_NotStaleEnough_NoRefresh() Protocol.HTTPS, authorizationTokenProvider, null, - clientWrapper.getSpyHttpClient(), refreshPeriodInSeconds); + clientWrapper.getSpyHttpClient(), + refreshPeriodInSeconds, + false); GatewayAddressCache spyCache = Mockito.spy(origCache); @@ -683,7 +747,9 @@ public void tryGetAddresses_SuboptimalMasterPartition_Stale_DoRefresh() throws E Protocol.HTTPS, authorizationTokenProvider, null, - clientWrapper.getSpyHttpClient(), refreshPeriodInSeconds); + clientWrapper.getSpyHttpClient(), + refreshPeriodInSeconds, + false); GatewayAddressCache spyCache = Mockito.spy(origCache); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java index cb5ac984f3c9..bbe90a3ece24 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java @@ -926,7 +926,7 @@ public SocketAddress remoteAddress() { } @Override - public URI remoteURI() { return this.remoteURI; } + public URI serverKey() { return this.remoteURI; } @Override public int requestQueueLength() {