Skip to content

Commit

Permalink
Cache refresh for container recreate with same id (#12747)
Browse files Browse the repository at this point in the history
* fix for delete and recreate collection with same name

Co-authored-by: Annie Liang <xinlian@microsoft.com>
  • Loading branch information
xinlian12 and Annie Liang authored Jul 7, 2020
1 parent da149bb commit 00e5096
Show file tree
Hide file tree
Showing 12 changed files with 468 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT License.
package com.azure.cosmos.implementation;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
Expand Down Expand Up @@ -38,6 +39,17 @@ static public <T> Mono<T> executeRetry(Callable<Mono<T>> callbackMethod,
}).retryWhen(RetryUtils.toRetryWhenFunc(retryPolicy));
}

static public <T> Flux<T> fluxExecuteRetry(Callable<Flux<T>> callbackMethod, IRetryPolicy retryPolicy) {

return Flux.defer(() -> {
try {
return callbackMethod.call();
} catch (Exception e) {
return Flux.error(e);
}
}).retryWhen(RetryUtils.toRetryWhenFunc(retryPolicy));
}

static public <T> Mono<T> executeAsync(
Function<Quadruple<Boolean, Boolean, Duration, Integer>, Mono<T>> callbackMethod, IRetryPolicy retryPolicy,
Function<Quadruple<Boolean, Boolean, Duration, Integer>, Mono<T>> inBackoffAlternateCallbackMethod,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ public InvalidPartitionExceptionRetryPolicy(RxCollectionCache collectionCache,
@Override
public void onBeforeSendRequest(RxDocumentServiceRequest request) {
this.request = request;
this.nextPolicy.onBeforeSendRequest(request);
if (this.nextPolicy != null) {
this.nextPolicy.onBeforeSendRequest(request);
}
}

@Override
Expand All @@ -56,7 +58,7 @@ public Mono<ShouldRetryResult> shouldRetry(Exception e) {
// TODO: this is blocking. is that fine?
if(this.cosmosQueryRequestOptions != null) {
this.clientCollectionCache.refresh(
BridgeInternal.getMetaDataDiagnosticContext(this.request.requestContext.cosmosDiagnostics),
this.request != null ? BridgeInternal.getMetaDataDiagnosticContext(this.request.requestContext.cosmosDiagnostics) : null,
collectionLink,
ModelBridgeInternal.getPropertiesFromQueryRequestOptions(this.cosmosQueryRequestOptions));
} else {
Expand All @@ -73,6 +75,9 @@ public Mono<ShouldRetryResult> shouldRetry(Exception e) {
}
}

return this.nextPolicy.shouldRetry(e);
if (this.nextPolicy != null) {
return this.nextPolicy.shouldRetry(e);
}
return Mono.just(ShouldRetryResult.error(e));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,21 @@ static public <T> Mono<T> inlineIfPossibleAsObs(Callable<Mono<T>> function, IRet
return BackoffRetryUtility.executeRetry(() -> function.call(), retryPolicy);
}
}

static public <T> Flux<T> fluxInlineIfPossibleAsObs(Callable<Flux<T>> function, IRetryPolicy retryPolicy) {

if (retryPolicy == null) {
// shortcut
return Flux.defer(() -> {
try {
return function.call();
} catch (Exception e) {
return Flux.error(e);
}
});

} else {
return BackoffRetryUtility.fluxExecuteRetry(() -> function.call(), retryPolicy);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -560,18 +560,42 @@ private String parentResourceLinkToQueryLink(String parentResouceLink, ResourceT
}

private <T extends Resource> Flux<FeedResponse<T>> createQuery(
String parentResourceLink,
String parentResourceLink,
SqlQuerySpec sqlQuery,
CosmosQueryRequestOptions options,
Class<T> klass,
ResourceType resourceTypeEnum) {

String resourceLink = parentResourceLinkToQueryLink(parentResourceLink, resourceTypeEnum);
UUID activityId = Utils.randomUUID();
IDocumentQueryClient queryClient = documentQueryClientImpl(RxDocumentClientImpl.this);

// Trying to put this logic as low as the query pipeline
// Since for parallelQuery, each partition will have its own request, so at this point, there will be no request associate with this retry policy.
// For default document context, it already wired up InvalidPartitionExceptionRetry, but there is no harm to wire it again here
InvalidPartitionExceptionRetryPolicy invalidPartitionExceptionRetryPolicy = new InvalidPartitionExceptionRetryPolicy(
this.collectionCache,
null,
resourceLink,
options);

return ObservableHelper.fluxInlineIfPossibleAsObs(
() -> createQueryInternal(resourceLink, sqlQuery, options, klass, resourceTypeEnum, queryClient, activityId),
invalidPartitionExceptionRetryPolicy);
}

private <T extends Resource> Flux<FeedResponse<T>> createQueryInternal(
String resourceLink,
SqlQuerySpec sqlQuery,
CosmosQueryRequestOptions options,
Class<T> klass,
ResourceType resourceTypeEnum) {
ResourceType resourceTypeEnum,
IDocumentQueryClient queryClient,
UUID activityId) {

String queryResourceLink = parentResourceLinkToQueryLink(parentResourceLink, resourceTypeEnum);

UUID activityId = Utils.randomUUID();
IDocumentQueryClient queryClient = documentQueryClientImpl(RxDocumentClientImpl.this);
Flux<? extends IDocumentQueryExecutionContext<T>> executionContext =
DocumentQueryExecutionContextFactory.createDocumentQueryExecutionContextAsync(queryClient, resourceTypeEnum, klass, sqlQuery , options, queryResourceLink, false, activityId);
DocumentQueryExecutionContextFactory.createDocumentQueryExecutionContextAsync(queryClient, resourceTypeEnum, klass, sqlQuery , options, resourceLink, false, activityId);

return executionContext.flatMap(iDocumentQueryExecutionContext -> {
QueryInfo queryInfo = null;
if (iDocumentQueryExecutionContext instanceof PipelinedDocumentQueryExecutionContext) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,24 @@
package com.azure.cosmos;

import com.azure.cosmos.implementation.InternalObjectNode;
import com.azure.cosmos.implementation.FailureValidator;
import com.azure.cosmos.implementation.InternalObjectNode;
import com.azure.cosmos.implementation.RetryAnalyzer;
import com.azure.cosmos.models.CosmosContainerResponse;
import com.azure.cosmos.models.CosmosDatabaseResponse;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.implementation.TestConfigurations;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerRequestOptions;
import com.azure.cosmos.models.CosmosContainerResponse;
import com.azure.cosmos.models.CosmosDatabaseProperties;
import com.azure.cosmos.models.CosmosDatabaseRequestOptions;
import com.azure.cosmos.models.CosmosDatabaseResponse;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.CosmosResponse;
import com.azure.cosmos.models.IndexingMode;
import com.azure.cosmos.models.IndexingPolicy;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.PartitionKeyDefinition;
import com.azure.cosmos.rx.CosmosItemResponseValidator;
import com.azure.cosmos.rx.TestSuiteBase;
import com.azure.cosmos.implementation.TestConfigurations;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -65,17 +64,6 @@ public Object[][] crudArgProvider() {
};
}

private CosmosContainerProperties getCollectionDefinition(String collectionName) {
PartitionKeyDefinition partitionKeyDef = new PartitionKeyDefinition();
ArrayList<String> paths = new ArrayList<>();
paths.add("/mypk");
partitionKeyDef.setPaths(paths);

return new CosmosContainerProperties(
collectionName,
partitionKeyDef);
}

private InternalObjectNode getDocumentDefinition(String documentId) {
final String uuid = UUID.randomUUID().toString();
return new InternalObjectNode(String.format("{ "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@
import com.azure.cosmos.models.CosmosContainerResponse;
import com.azure.cosmos.models.IndexingMode;
import com.azure.cosmos.models.IndexingPolicy;
import com.azure.cosmos.models.PartitionKeyDefinition;
import com.azure.cosmos.rx.TestSuiteBase;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -102,17 +100,6 @@ public void replaceContainer_withContentResponseOnWriteDisabled() throws Excepti

}

private CosmosContainerProperties getCollectionDefinition(String collectionName) {
PartitionKeyDefinition partitionKeyDef = new PartitionKeyDefinition();
ArrayList<String> paths = new ArrayList<String>();
paths.add("/mypk");
partitionKeyDef.setPaths(paths);

return new CosmosContainerProperties(
collectionName,
partitionKeyDef);
}

private void validateContainerResponse(CosmosContainerProperties containerProperties,
CosmosContainerResponse createResponse) {
// Basic validation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,23 @@

package com.azure.cosmos;

import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerRequestOptions;
import com.azure.cosmos.models.CosmosContainerResponse;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.IndexingMode;
import com.azure.cosmos.models.IndexingPolicy;
import com.azure.cosmos.models.PartitionKeyDefinition;
import com.azure.cosmos.models.SqlQuerySpec;
import com.azure.cosmos.models.ThroughputProperties;
import com.azure.cosmos.rx.TestSuiteBase;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.util.CosmosPagedIterable;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -53,19 +51,6 @@ public void afterClass() {
safeCloseSyncClient(client);
}

private CosmosContainerProperties getCollectionDefinition(String collectionName) {
PartitionKeyDefinition partitionKeyDef = new PartitionKeyDefinition();
ArrayList<String> paths = new ArrayList<String>();
paths.add("/mypk");
partitionKeyDef.setPaths(paths);

CosmosContainerProperties collectionDefinition = new CosmosContainerProperties(
collectionName,
partitionKeyDef);

return collectionDefinition;
}

@Test(groups = { "emulator" }, timeOut = TIMEOUT)
public void createContainer_withProperties() throws Exception {
String collectionName = UUID.randomUUID().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,16 @@
package com.azure.cosmos;

import com.azure.cosmos.implementation.InternalObjectNode;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.PartitionKeyDefinition;
import com.azure.cosmos.rx.TestSuiteBase;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -106,17 +103,6 @@ public void deleteItem_withContentResponseOnWriteDisabled() throws Exception {
validateMinimalItemResponse(properties, deleteResponse, false);
}

private CosmosContainerProperties getCollectionDefinition(String collectionName) {
PartitionKeyDefinition partitionKeyDef = new PartitionKeyDefinition();
ArrayList<String> paths = new ArrayList<String>();
paths.add("/mypk");
partitionKeyDef.setPaths(paths);

return new CosmosContainerProperties(
collectionName,
partitionKeyDef);
}

private InternalObjectNode getDocumentDefinition(String documentId) {
final String uuid = UUID.randomUUID().toString();
final InternalObjectNode properties =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,6 @@ public Object[][] collectionCrudArgProvider() {
};
}

private CosmosContainerProperties getCollectionDefinition(String collectionName) {
PartitionKeyDefinition partitionKeyDef = new PartitionKeyDefinition();
ArrayList<String> paths = new ArrayList<String>();
paths.add("/mypk");
partitionKeyDef.setPaths(paths);

CosmosContainerProperties collectionDefinition = new CosmosContainerProperties(
collectionName,
partitionKeyDef);

return collectionDefinition;
}

@Test(groups = { "emulator" }, timeOut = TIMEOUT, dataProvider = "collectionCrudArgProvider")
public void createCollection(String collectionName) throws InterruptedException {
CosmosContainerProperties collectionDefinition = getCollectionDefinition(collectionName);
Expand Down
Loading

0 comments on commit 00e5096

Please sign in to comment.