Skip to content

Commit

Permalink
ConnectionStateListener-improvement1 (Azure#16647)
Browse files Browse the repository at this point in the history
* ConnectionStateListener Improvement

Co-authored-by: Annie Liang <xinlian@microsoft.com>
  • Loading branch information
xinlian12 and Annie Liang authored Oct 30, 2020
1 parent 269098f commit 9496334
Show file tree
Hide file tree
Showing 13 changed files with 209 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
package com.azure.cosmos.implementation.directconnectivity;

import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdUtils;

import java.net.URI;
import java.util.Locale;
import java.util.Objects;

Expand Down Expand Up @@ -53,6 +55,10 @@ public String getProtocolScheme() {
return this.protocol.scheme();
}

public URI getServerKey() {
return RntbdUtils.getServerKey(this.physicalUri.getURI());
}

@Override
public String toString() {
return "AddressInformation{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

import java.util.Set;
import java.net.URI;
import java.util.concurrent.Callable;
import java.util.function.Function;

Expand Down Expand Up @@ -64,11 +64,6 @@ public void initializeCaches(
this.collectionRoutingMapCache = collectionRoutingMapCache;
}

@Override
public void remove(RxDocumentServiceRequest request, Set<PartitionKeyRangeIdentity> partitionKeyRangeIdentitySet) {
throw new NotImplementedException("remove() is not supported in AddressResolver");
}

public Mono<AddressInformation[]> resolveAsync(
RxDocumentServiceRequest request,
boolean forceRefreshPartitionAddresses) {
Expand All @@ -89,6 +84,11 @@ public Mono<AddressInformation[]> resolveAsync(
});
}

@Override
public void updateAddresses(RxDocumentServiceRequest request, URI serverKey) {
throw new NotImplementedException("updateAddresses() is not supported in AddressResolver");
}

private static boolean isSameCollection(PartitionKeyRange initiallyResolved, PartitionKeyRange newlyResolved) {
if (initiallyResolved == null) {
throw new IllegalArgumentException("parent");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -84,14 +85,18 @@ public class GatewayAddressCache implements IAddressCache {
private volatile Pair<PartitionKeyRangeIdentity, AddressInformation[]> masterPartitionAddressCache;
private volatile Instant suboptimalMasterPartitionTimestamp;

private final ConcurrentHashMap<URI, Set<PartitionKeyRangeIdentity>> serverPartitionAddressToPkRangeIdMap;
private final boolean tcpConnectionEndpointRediscoveryEnabled;

public GatewayAddressCache(
DiagnosticsClientContext clientContext,
URI serviceEndpoint,
Protocol protocol,
IAuthorizationTokenProvider tokenProvider,
UserAgentContainer userAgent,
HttpClient httpClient,
long suboptimalPartitionForceRefreshIntervalInSeconds) {
long suboptimalPartitionForceRefreshIntervalInSeconds,
boolean tcpConnectionEndpointRediscoveryEnabled) {
this.clientContext = clientContext;
try {
this.addressEndpoint = new URL(serviceEndpoint.toURL(), Paths.ADDRESS_PATH_SEGMENT).toURI();
Expand Down Expand Up @@ -124,6 +129,9 @@ public GatewayAddressCache(

// Set requested API version header for version enforcement.
defaultRequestHeaders.put(HttpConstants.HttpHeaders.VERSION, HttpConstants.Versions.CURRENT_VERSION);

this.serverPartitionAddressToPkRangeIdMap = new ConcurrentHashMap<>();
this.tcpConnectionEndpointRediscoveryEnabled = tcpConnectionEndpointRediscoveryEnabled;
}

public GatewayAddressCache(
Expand All @@ -132,26 +140,40 @@ public GatewayAddressCache(
Protocol protocol,
IAuthorizationTokenProvider tokenProvider,
UserAgentContainer userAgent,
HttpClient httpClient) {
HttpClient httpClient,
boolean tcpConnectionEndpointRediscoveryEnabled) {
this(clientContext,
serviceEndpoint,
protocol,
tokenProvider,
userAgent,
httpClient,
DefaultSuboptimalPartitionForceRefreshIntervalInSeconds);
DefaultSuboptimalPartitionForceRefreshIntervalInSeconds,
tcpConnectionEndpointRediscoveryEnabled);
}

@Override
public void removeAddress(final PartitionKeyRangeIdentity partitionKeyRangeIdentity) {
public void updateAddresses(final URI serverKey) {

Objects.requireNonNull(partitionKeyRangeIdentity, "expected non-null partitionKeyRangeIdentity");
Objects.requireNonNull(serverKey, "expected non-null serverKey");

if (partitionKeyRangeIdentity.getPartitionKeyRangeId().equals(PartitionKeyRange.MASTER_PARTITION_KEY_RANGE_ID)) {
this.masterPartitionAddressCache = null;
if (this.tcpConnectionEndpointRediscoveryEnabled) {
this.serverPartitionAddressToPkRangeIdMap.computeIfPresent(serverKey, (uri, partitionKeyRangeIdentitySet) -> {

for (PartitionKeyRangeIdentity partitionKeyRangeIdentity : partitionKeyRangeIdentitySet) {
if (partitionKeyRangeIdentity.getPartitionKeyRangeId().equals(PartitionKeyRange.MASTER_PARTITION_KEY_RANGE_ID)) {
this.masterPartitionAddressCache = null;
} else {
this.serverPartitionAddressCache.remove(partitionKeyRangeIdentity);
}
}

return null;
});
} else {
this.serverPartitionAddressCache.remove(partitionKeyRangeIdentity);
logger.warn("tcpConnectionEndpointRediscovery is not enabled, should not reach here.");
}

}

@Override
Expand Down Expand Up @@ -622,14 +644,39 @@ public Mono<List<Address>> getMasterAddressesViaGatewayAsync(
}

private Pair<PartitionKeyRangeIdentity, AddressInformation[]> toPartitionAddressAndRange(String collectionRid, List<Address> addresses) {
logger.debug("toPartitionAddressAndRange");
if (logger.isDebugEnabled()) {
logger.debug("toPartitionAddressAndRange");
}

Address address = addresses.get(0);
PartitionKeyRangeIdentity partitionKeyRangeIdentity = new PartitionKeyRangeIdentity(collectionRid, address.getParitionKeyRangeId());

AddressInformation[] addressInfos =
addresses.stream().map(addr ->
GatewayAddressCache.toAddressInformation(addr)
).collect(Collectors.toList()).toArray(new AddressInformation[addresses.size()]);
return Pair.of(new PartitionKeyRangeIdentity(collectionRid, address.getParitionKeyRangeId()), addressInfos);

if (this.tcpConnectionEndpointRediscoveryEnabled) {
for (AddressInformation addressInfo : addressInfos) {
if (logger.isDebugEnabled()) {
logger.debug(
"Added address to serverPartitionAddressToPkRangeIdMap: ({\"partitionKeyRangeIdentity\":{},\"address\":{}})",
partitionKeyRangeIdentity,
addressInfo);
}

this.serverPartitionAddressToPkRangeIdMap.compute(addressInfo.getServerKey(), (serverKey, partitionKeyRangeIdentitySet) -> {
if (partitionKeyRangeIdentitySet == null) {
partitionKeyRangeIdentitySet = ConcurrentHashMap.newKeySet();
}

partitionKeyRangeIdentitySet.add(partitionKeyRangeIdentity);
return partitionKeyRangeIdentitySet;
});
}
}

return Pair.of(partitionKeyRangeIdentity, addressInfos);
}

private static AddressInformation toAddressInformation(Address address) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.azure.cosmos.implementation.http.HttpClient;
import com.azure.cosmos.implementation.routing.CollectionRoutingMap;
import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.concurrent.Queues;
Expand All @@ -28,11 +30,12 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

public class GlobalAddressResolver implements IAddressResolver {
private static final Logger logger = LoggerFactory.getLogger(GlobalAddressResolver.class);

private final static int MaxBackupReadRegions = 3;
private final DiagnosticsClientContext diagnosticsClientContext;
private final GlobalEndpointManager endpointManager;
Expand All @@ -44,6 +47,7 @@ public class GlobalAddressResolver implements IAddressResolver {
private final int maxEndpoints;
private final GatewayServiceConfigurationReader serviceConfigReader;
final Map<URI, EndpointCache> addressCacheByEndpoint;
private final boolean tcpConnectionEndpointRediscoveryEnabled;

private HttpClient httpClient;

Expand All @@ -67,6 +71,7 @@ public GlobalAddressResolver(
this.collectionCache = collectionCache;
this.routingMapProvider = routingMapProvider;
this.serviceConfigReader = serviceConfigReader;
this.tcpConnectionEndpointRediscoveryEnabled = connectionPolicy.isTcpConnectionEndpointRediscoveryEnabled();

int maxBackupReadEndpoints = (connectionPolicy.isReadRequestsFallbackEnabled()) ? GlobalAddressResolver.MaxBackupReadRegions : 0;
this.maxEndpoints = maxBackupReadEndpoints + 2; // for write and alternate write getEndpoint (during failover)
Expand Down Expand Up @@ -101,22 +106,22 @@ Mono<Void> openAsync(DocumentCollection collection) {
}

@Override
public void remove(
final RxDocumentServiceRequest request,
final Set<PartitionKeyRangeIdentity> partitionKeyRangeIdentitySet) {
public void updateAddresses(final RxDocumentServiceRequest request, final URI serverKey) {

Objects.requireNonNull(request, "expected non-null request");
Objects.requireNonNull(partitionKeyRangeIdentitySet, "expected non-null partitionKeyRangeIdentitySet");

if (partitionKeyRangeIdentitySet.size() > 0) {
Objects.requireNonNull(serverKey, "expected non-null serverKey");

URI addressResolverURI = this.endpointManager.resolveServiceEndpoint(request);
if (this.tcpConnectionEndpointRediscoveryEnabled) {
URI serviceEndpoint = this.endpointManager.resolveServiceEndpoint(request);
this.addressCacheByEndpoint.computeIfPresent(serviceEndpoint, (ignored, endpointCache) -> {

this.addressCacheByEndpoint.computeIfPresent(addressResolverURI, (ignored, endpointCache) -> {
final GatewayAddressCache addressCache = endpointCache.addressCache;
partitionKeyRangeIdentitySet.forEach(partitionKeyRangeIdentity -> addressCache.removeAddress(partitionKeyRangeIdentity));
addressCache.updateAddresses(serverKey);

return endpointCache;
});
} else {
logger.warn("tcpConnectionEndpointRediscovery is not enabled, should not reach here.");
}
}

Expand All @@ -139,7 +144,14 @@ private IAddressResolver getAddressResolver(RxDocumentServiceRequest rxDocumentS

private EndpointCache getOrAddEndpoint(URI endpoint) {
EndpointCache endpointCache = this.addressCacheByEndpoint.computeIfAbsent(endpoint , key -> {
GatewayAddressCache gatewayAddressCache = new GatewayAddressCache(this.diagnosticsClientContext, endpoint, protocol, this.tokenProvider, this.userAgentContainer, this.httpClient);
GatewayAddressCache gatewayAddressCache = new GatewayAddressCache(
this.diagnosticsClientContext,
endpoint,
protocol,
this.tokenProvider,
this.userAgentContainer,
this.httpClient,
this.tcpConnectionEndpointRediscoveryEnabled);
AddressResolver addressResolver = new AddressResolver();
addressResolver.initializeCaches(this.collectionCache, this.routingMapProvider, gatewayAddressCache);
EndpointCache cache = new EndpointCache();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@
import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity;
import reactor.core.publisher.Mono;

import java.net.URI;

public interface IAddressCache {

/**
* Removes the physical address associated with the given {@link PartitionKeyRangeIdentity partition key range identity}
* Update the physical address of the {@link PartitionKeyRangeIdentity partition key range identity} associated to the serverKey.
*
*
*/
void removeAddress(PartitionKeyRangeIdentity partitionKeyRangeIdentity);
void updateAddresses(URI serverKey);

/**
* Resolves physical addresses by either PartitionKeyRangeIdentity.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@
package com.azure.cosmos.implementation.directconnectivity;

import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity;
import reactor.core.publisher.Mono;

import java.util.Set;
import java.net.URI;

public interface IAddressResolver {

void remove(RxDocumentServiceRequest request, Set<PartitionKeyRangeIdentity> partitionKeyRangeIdentitySet);

Mono<AddressInformation[]> resolveAsync(
RxDocumentServiceRequest request,
boolean forceRefreshPartitionAddresses);

void updateAddresses(RxDocumentServiceRequest request, URI serverKey);
}
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ public Builder(ConnectionPolicy connectionPolicy) {

this.bufferPageSize = DEFAULT_OPTIONS.bufferPageSize;
this.connectionAcquisitionTimeout = DEFAULT_OPTIONS.connectionAcquisitionTimeout;
this.connectionEndpointRediscoveryEnabled = DEFAULT_OPTIONS.connectionEndpointRediscoveryEnabled;
this.connectionEndpointRediscoveryEnabled = connectionPolicy.isTcpConnectionEndpointRediscoveryEnabled();
this.connectTimeout = connectionPolicy.getConnectTimeout();
this.idleChannelTimeout = connectionPolicy.getIdleTcpConnectionTimeout();
this.idleChannelTimerResolution = DEFAULT_OPTIONS.idleChannelTimerResolution;
Expand Down
Loading

0 comments on commit 9496334

Please sign in to comment.