From 957740dbb168bf41261158e2aa5014d320733125 Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Mon, 16 May 2022 18:30:02 -0700 Subject: [PATCH] Improved diagnostics with new models for StoreResponse, StoreResult and CosmosException (#28620) * Improved diagnostics with new models for StoreResponse, StoreResult and CosmosException * Fixed spot bugs related to storeResult.getException() * Updated query plan cache to ConcurrentHashMap with fixed size of 1000 to start with * Added exception response headers and message to direct and gateway errors. Also added code for throwing any java.lang.Error * Added unit tests for StoreReader and ConsistencyWriter * Disabled StoreReader unit test for error since it is causing other tests to fail. Will investigate later * Commented out the broken test * Reverted StoreReaderTest * Removed mockito-inline * Fixed StoreReaderTest static mocking * Fixed ConsistencyWriterTest static mocking * Code review comments and changelog addition * Fixed some test cases --- .../cosmos/benchmark/ReadMyWriteWorkflow.java | 3 + sdk/cosmos/azure-cosmos/CHANGELOG.md | 3 + sdk/cosmos/azure-cosmos/pom.xml | 6 + .../java/com/azure/cosmos/BridgeInternal.java | 25 ++- .../com/azure/cosmos/CosmosException.java | 26 +-- .../ClientSideRequestStatistics.java | 84 ++++---- .../cosmos/implementation/Exceptions.java | 8 + .../ImplementationBridgeHelpers.java | 1 - .../implementation/RxDocumentClientImpl.java | 6 +- .../RxDocumentServiceResponse.java | 10 +- .../implementation/RxGatewayStoreModel.java | 29 ++- .../cosmos/implementation/TracerProvider.java | 7 +- .../implementation/cpu/CpuMemoryReader.java | 12 ++ .../directconnectivity/ConsistencyWriter.java | 6 + .../DirectBridgeInternal.java | 25 --- .../GatewayAddressCache.java | 10 +- .../directconnectivity/HttpUtils.java | 8 + .../directconnectivity/ResponseUtils.java | 2 +- .../directconnectivity/StoreClient.java | 18 +- .../directconnectivity/StoreReader.java | 5 +- .../directconnectivity/StoreResponse.java | 89 ++------ .../StoreResponseDiagnostics.java | 160 ++++++++++++++ .../directconnectivity/StoreResult.java | 188 +--------------- .../StoreResultDiagnostics.java | 203 ++++++++++++++++++ .../rntbd/RntbdClientChannelHandler.java | 4 +- .../rntbd/RntbdClientChannelPool.java | 19 +- .../rntbd/RntbdMetrics.java | 7 +- .../rntbd/RntbdReporter.java | 3 + .../rntbd/RntbdRequestManager.java | 4 +- .../rntbd/RntbdResponse.java | 2 +- .../DocumentQueryExecutionContextFactory.java | 5 + .../azure/cosmos/util/CosmosPagedFlux.java | 3 +- .../azure/cosmos/CosmosDiagnosticsTest.java | 29 ++- .../com/azure/cosmos/CosmosTracerTest.java | 16 +- .../cosmos/RetryContextOnDiagnosticTest.java | 6 +- .../implementation/StoreResponseBuilder.java | 30 ++- .../batch/CosmosBulkItemResponseTest.java | 4 +- .../TransactionalBatchResponseTests.java | 4 +- .../ConsistencyWriterTest.java | 66 +++++- .../directconnectivity/HttpUtilsTest.java | 6 +- .../directconnectivity/StoreReaderTest.java | 51 +++++ .../directconnectivity/StoreResponseTest.java | 2 +- .../StoreResponseValidator.java | 17 +- .../com/azure/cosmos/rx/DocumentCrudTest.java | 2 +- 44 files changed, 725 insertions(+), 489 deletions(-) delete mode 100644 sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/DirectBridgeInternal.java create mode 100644 sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponseDiagnostics.java create mode 100644 sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResultDiagnostics.java diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ReadMyWriteWorkflow.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ReadMyWriteWorkflow.java index 1a20600bf5cc2..8e4f333207058 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ReadMyWriteWorkflow.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ReadMyWriteWorkflow.java @@ -153,6 +153,9 @@ protected void performWorkload(BaseSubscriber baseSubscriber, long i) } catch (Throwable error) { concurrencyControlSemaphore.release(); logger.error("subscription failed due to ", error); + if (error instanceof Error) { + throw (Error) error; + } } } diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 173f11f6b1cf6..8b54d44a67e22 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -7,8 +7,11 @@ #### Breaking Changes #### Bugs Fixed +* Fixed bubbling of Errors in case of any `java.lang.Error` - See [PR 28620](https://github.com/Azure/azure-sdk-for-java/pull/28620) #### Other Changes +* Added `exceptionMessage` and `exceptionResponseHeaders` to `CosmosDiagnostics` in case of any exceptions - See [PR 28620](https://github.com/Azure/azure-sdk-for-java/pull/28620) +* Improved performance of `query plan` cache by using `ConcurrentHashMap` with a fixed size of 1000 - See [PR 28537](https://github.com/Azure/azure-sdk-for-java/pull/28537) * Changed 429 (Throttling) retry policy to have an upper bound for the back-off time of 5 seconds - See [PR 28764](https://github.com/Azure/azure-sdk-for-java/pull/28764) ### 4.29.1 (2022-04-27) diff --git a/sdk/cosmos/azure-cosmos/pom.xml b/sdk/cosmos/azure-cosmos/pom.xml index 34b2f11783888..31bcd25553c9e 100644 --- a/sdk/cosmos/azure-cosmos/pom.xml +++ b/sdk/cosmos/azure-cosmos/pom.xml @@ -241,6 +241,12 @@ Licensed under the MIT License. 4.0.0 test + + org.mockito + mockito-inline + 4.0.0 + test + diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/BridgeInternal.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/BridgeInternal.java index ca4a6034a14b9..460e69112ee09 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/BridgeInternal.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/BridgeInternal.java @@ -30,7 +30,9 @@ import com.azure.cosmos.implementation.TracerProvider; import com.azure.cosmos.implementation.Warning; import com.azure.cosmos.implementation.directconnectivity.StoreResponse; +import com.azure.cosmos.implementation.directconnectivity.StoreResponseDiagnostics; import com.azure.cosmos.implementation.directconnectivity.StoreResult; +import com.azure.cosmos.implementation.directconnectivity.StoreResultDiagnostics; import com.azure.cosmos.implementation.directconnectivity.Uri; import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdChannelAcquisitionTimeline; import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpointStatistics; @@ -561,18 +563,13 @@ public static ClientSideRequestStatistics getClientSideRequestStatics(CosmosDiag return clientSideRequestStatistics; } - @Warning(value = INTERNAL_USE_ONLY_WARNING) - public static void setGatewayRequestTimelineOnDiagnostics(CosmosDiagnostics cosmosDiagnostics, - RequestTimeline requestTimeline) { - cosmosDiagnostics.clientSideRequestStatistics().setGatewayRequestTimeline(requestTimeline); - } - @Warning(value = INTERNAL_USE_ONLY_WARNING) public static void recordResponse(CosmosDiagnostics cosmosDiagnostics, RxDocumentServiceRequest request, StoreResult storeResult, GlobalEndpointManager globalEndpointManager) { - cosmosDiagnostics.clientSideRequestStatistics().recordResponse(request, storeResult, globalEndpointManager); + StoreResultDiagnostics storeResultDiagnostics = StoreResultDiagnostics.createStoreResultDiagnostics(storeResult); + cosmosDiagnostics.clientSideRequestStatistics().recordResponse(request, storeResultDiagnostics, globalEndpointManager); } @Warning(value = INTERNAL_USE_ONLY_WARNING) @@ -602,9 +599,19 @@ public static SerializationDiagnosticsContext getSerializationDiagnosticsContext public static void recordGatewayResponse(CosmosDiagnostics cosmosDiagnostics, RxDocumentServiceRequest rxDocumentServiceRequest, StoreResponse storeResponse, - CosmosException exception, GlobalEndpointManager globalEndpointManager) { - cosmosDiagnostics.clientSideRequestStatistics().recordGatewayResponse(rxDocumentServiceRequest, storeResponse, exception, globalEndpointManager); + StoreResponseDiagnostics storeResponseDiagnostics = StoreResponseDiagnostics.createStoreResponseDiagnostics(storeResponse); + cosmosDiagnostics.clientSideRequestStatistics().recordGatewayResponse(rxDocumentServiceRequest, storeResponseDiagnostics, globalEndpointManager); + } + + @Warning(value = INTERNAL_USE_ONLY_WARNING) + public static void recordGatewayResponse(CosmosDiagnostics cosmosDiagnostics, + RxDocumentServiceRequest rxDocumentServiceRequest, + CosmosException cosmosException, + GlobalEndpointManager globalEndpointManager) { + StoreResponseDiagnostics storeResponseDiagnostics = StoreResponseDiagnostics.createStoreResponseDiagnostics(cosmosException); + cosmosDiagnostics.clientSideRequestStatistics().recordGatewayResponse(rxDocumentServiceRequest, storeResponseDiagnostics, globalEndpointManager); + cosmosException.setDiagnostics(cosmosDiagnostics); } @Warning(value = INTERNAL_USE_ONLY_WARNING) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosException.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosException.java index 5298a4a1203fd..d631cce276f55 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosException.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosException.java @@ -154,6 +154,7 @@ protected CosmosException(int statusCode, String message, Map re this.statusCode = statusCode; this.responseHeaders = new ConcurrentHashMap<>(); + // Since ConcurrentHashMap only takes non-null entries, so filtering them before putting them in. if (responseHeaders != null) { for (Map.Entry entry: responseHeaders.entrySet()) { if (entry.getKey() != null && entry.getValue() != null) { @@ -553,31 +554,6 @@ void setRntbdPendingRequestQueueSize(int rntbdPendingRequestQueueSize) { public CosmosException createCosmosException(int statusCode, Exception innerException) { return new CosmosException(statusCode, innerException); } - - @Override - public CosmosException createSerializableCosmosException(CosmosException cosmosException) { - if (cosmosException == null) { - return null; - } - CosmosException exception = new CosmosException(cosmosException.statusCode, - cosmosException.cosmosError, cosmosException.getResponseHeaders()); - exception.requestTimeline = cosmosException.requestTimeline; - exception.channelAcquisitionTimeline = cosmosException.channelAcquisitionTimeline; - exception.rntbdChannelTaskQueueSize = cosmosException.rntbdChannelTaskQueueSize; - exception.rntbdEndpointStatistics = cosmosException.rntbdEndpointStatistics; - exception.lsn = cosmosException.lsn; - exception.partitionKeyRangeId = cosmosException.partitionKeyRangeId; - exception.requestUri = cosmosException.requestUri; - exception.resourceAddress = cosmosException.resourceAddress; - exception.requestPayloadLength = cosmosException.requestPayloadLength; - exception.rntbdPendingRequestQueueSize = cosmosException.rntbdPendingRequestQueueSize; - exception.rntbdRequestLength = cosmosException.rntbdRequestLength; - exception.rntbdResponseLength = cosmosException.rntbdResponseLength; - exception.sendingRequestHasStarted = cosmosException.sendingRequestHasStarted; - exception.requestHeaders = null; - exception.cosmosDiagnostics = null; - return exception; - } }); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientSideRequestStatistics.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientSideRequestStatistics.java index 0d3d976978404..271f3d4f50de0 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientSideRequestStatistics.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientSideRequestStatistics.java @@ -2,12 +2,10 @@ // Licensed under the MIT License. package com.azure.cosmos.implementation; -import com.azure.cosmos.CosmosException; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.implementation.cpu.CpuMemoryMonitor; -import com.azure.cosmos.implementation.directconnectivity.DirectBridgeInternal; -import com.azure.cosmos.implementation.directconnectivity.StoreResponse; -import com.azure.cosmos.implementation.directconnectivity.StoreResult; +import com.azure.cosmos.implementation.directconnectivity.StoreResponseDiagnostics; +import com.azure.cosmos.implementation.directconnectivity.StoreResultDiagnostics; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.annotation.JsonSerialize; @@ -43,7 +41,6 @@ public class ClientSideRequestStatistics { private Set locationEndpointsContacted; private RetryContext retryContext; private GatewayStatistics gatewayStatistics; - private RequestTimeline gatewayRequestTimeline; private MetadataDiagnosticsContext metadataDiagnosticsContext; private SerializationDiagnosticsContext serializationDiagnosticsContext; @@ -93,13 +90,13 @@ public DiagnosticsClientContext.DiagnosticsClientConfig getDiagnosticsClientConf return diagnosticsClientConfig; } - public void recordResponse(RxDocumentServiceRequest request, StoreResult storeResult, GlobalEndpointManager globalEndpointManager) { + public void recordResponse(RxDocumentServiceRequest request, StoreResultDiagnostics storeResultDiagnostics, GlobalEndpointManager globalEndpointManager) { Objects.requireNonNull(request, "request is required and cannot be null."); Instant responseTime = Instant.now(); StoreResponseStatistics storeResponseStatistics = new StoreResponseStatistics(); storeResponseStatistics.requestResponseTimeUTC = responseTime; - storeResponseStatistics.storeResult = StoreResult.createSerializableStoreResult(storeResult); + storeResponseStatistics.storeResult = storeResultDiagnostics; storeResponseStatistics.requestOperationType = request.getOperationType(); storeResponseStatistics.requestResourceType = request.getResourceType(); activityId = request.getActivityId().toString(); @@ -117,8 +114,7 @@ public void recordResponse(RxDocumentServiceRequest request, StoreResult storeRe this.requestEndTimeUTC = responseTime; } - // TODO (kuthapar): globalEndpointManager != null check is just for safety for hotfix. Remove it after further investigation - if (locationEndPoint != null && globalEndpointManager != null) { + if (locationEndPoint != null) { this.regionsContacted.add(globalEndpointManager.getRegionName(locationEndPoint, request.getOperationType())); this.locationEndpointsContacted.add(locationEndPoint); } @@ -133,8 +129,7 @@ public void recordResponse(RxDocumentServiceRequest request, StoreResult storeRe } public void recordGatewayResponse( - RxDocumentServiceRequest rxDocumentServiceRequest, StoreResponse storeResponse, - CosmosException exception, GlobalEndpointManager globalEndpointManager) { + RxDocumentServiceRequest rxDocumentServiceRequest, StoreResponseDiagnostics storeResponseDiagnostics, GlobalEndpointManager globalEndpointManager) { Instant responseTime = Instant.now(); synchronized (this) { @@ -148,8 +143,7 @@ public void recordGatewayResponse( } this.recordRetryContextEndTime(); - // TODO (kuthapar): globalEndpointManager != null check is just for safety for hotfix. Remove it after further investigation - if (locationEndPoint != null && globalEndpointManager != null) { + if (locationEndPoint != null) { this.regionsContacted.add(globalEndpointManager.getRegionName(locationEndPoint, rxDocumentServiceRequest.getOperationType())); this.locationEndpointsContacted.add(locationEndPoint); } @@ -159,34 +153,18 @@ public void recordGatewayResponse( this.gatewayStatistics.operationType = rxDocumentServiceRequest.getOperationType(); this.gatewayStatistics.resourceType = rxDocumentServiceRequest.getResourceType(); } - if (storeResponse != null) { - this.gatewayStatistics.statusCode = storeResponse.getStatus(); - this.gatewayStatistics.subStatusCode = DirectBridgeInternal.getSubStatusCode(storeResponse); - this.gatewayStatistics.sessionToken = storeResponse - .getHeaderValue(HttpConstants.HttpHeaders.SESSION_TOKEN); - this.gatewayStatistics.requestCharge = storeResponse - .getHeaderValue(HttpConstants.HttpHeaders.REQUEST_CHARGE); - this.gatewayStatistics.requestTimeline = DirectBridgeInternal.getRequestTimeline(storeResponse); - this.gatewayStatistics.partitionKeyRangeId = storeResponse.getPartitionKeyRangeId(); - this.activityId= storeResponse.getHeaderValue(HttpConstants.HttpHeaders.ACTIVITY_ID); - } else if (exception != null) { - this.gatewayStatistics.statusCode = exception.getStatusCode(); - this.gatewayStatistics.subStatusCode = exception.getSubStatusCode(); - this.gatewayStatistics.requestTimeline = this.gatewayRequestTimeline; - this.gatewayStatistics.requestCharge= String.valueOf(exception.getRequestCharge()); - this.activityId=exception.getActivityId(); - } + this.gatewayStatistics.statusCode = storeResponseDiagnostics.getStatusCode(); + this.gatewayStatistics.subStatusCode = storeResponseDiagnostics.getSubStatusCode(); + this.gatewayStatistics.sessionToken = storeResponseDiagnostics.getSessionTokenAsString(); + this.gatewayStatistics.requestCharge = storeResponseDiagnostics.getRequestCharge(); + this.gatewayStatistics.requestTimeline = storeResponseDiagnostics.getRequestTimeline(); + this.gatewayStatistics.partitionKeyRangeId = storeResponseDiagnostics.getPartitionKeyRangeId(); + this.gatewayStatistics.exceptionMessage = storeResponseDiagnostics.getExceptionMessage(); + this.gatewayStatistics.exceptionResponseHeaders = storeResponseDiagnostics.getExceptionResponseHeaders(); + this.activityId = storeResponseDiagnostics.getActivityId(); } } - public void setGatewayRequestTimeline(RequestTimeline transportRequestTimeline) { - this.gatewayRequestTimeline = transportRequestTimeline; - } - - public RequestTimeline getGatewayRequestTimeline() { - return this.gatewayRequestTimeline; - } - public String recordAddressResolutionStart( URI targetEndpoint, boolean forceRefresh, @@ -209,7 +187,7 @@ public String recordAddressResolutionStart( return identifier; } - public void recordAddressResolutionEnd(String identifier, String errorMessage) { + public void recordAddressResolutionEnd(String identifier, String exceptionMessage) { if (StringUtils.isEmpty(identifier)) { return; } @@ -227,7 +205,7 @@ public void recordAddressResolutionEnd(String identifier, String errorMessage) { AddressResolutionStatistics resolutionStatistics = this.addressResolutionStatistics.get(identifier); resolutionStatistics.endTimeUTC = responseTime; - resolutionStatistics.errorMessage = errorMessage; + resolutionStatistics.exceptionMessage = exceptionMessage; resolutionStatistics.inflightRequest = false; } } @@ -297,8 +275,8 @@ public GatewayStatistics getGatewayStatistics() { } public static class StoreResponseStatistics { - @JsonSerialize(using = StoreResult.StoreResultSerializer.class) - private StoreResult storeResult; + @JsonSerialize(using = StoreResultDiagnostics.StoreResultDiagnosticsSerializer.class) + private StoreResultDiagnostics storeResult; @JsonSerialize(using = DiagnosticsInstantSerializer.class) private Instant requestResponseTimeUTC; @JsonSerialize @@ -306,7 +284,7 @@ public static class StoreResponseStatistics { @JsonSerialize private OperationType requestOperationType; - public StoreResult getStoreResult() { + public StoreResultDiagnostics getStoreResult() { return storeResult; } @@ -409,7 +387,7 @@ public static class AddressResolutionStatistics { @JsonSerialize private String targetEndpoint; @JsonSerialize - private String errorMessage; + private String exceptionMessage; @JsonSerialize private boolean forceRefresh; @JsonSerialize @@ -433,8 +411,8 @@ public String getTargetEndpoint() { return targetEndpoint; } - public String getErrorMessage() { - return errorMessage; + public String getExceptionMessage() { + return exceptionMessage; } public boolean isInflightRequest() { @@ -456,9 +434,11 @@ public static class GatewayStatistics { private ResourceType resourceType; private int statusCode; private int subStatusCode; - private String requestCharge; + private double requestCharge; private RequestTimeline requestTimeline; private String partitionKeyRangeId; + private String exceptionMessage; + private String exceptionResponseHeaders; public String getSessionToken() { return sessionToken; @@ -476,7 +456,7 @@ public int getSubStatusCode() { return subStatusCode; } - public String getRequestCharge() { + public double getRequestCharge() { return requestCharge; } @@ -491,6 +471,14 @@ public ResourceType getResourceType() { public String getPartitionKeyRangeId() { return partitionKeyRangeId; } + + public String getExceptionMessage() { + return exceptionMessage; + } + + public String getExceptionResponseHeaders() { + return exceptionResponseHeaders; + } } public static SystemInformation fetchSystemInformation() { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Exceptions.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Exceptions.java index b8cc0a95ca660..96f041dbe2d4e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Exceptions.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Exceptions.java @@ -18,6 +18,14 @@ public static boolean isSubStatusCode(CosmosException e, int subStatus) { return subStatus == e.getSubStatusCode(); } + public static boolean isGone(CosmosException e) { + return isStatusCode(e, HttpConstants.StatusCodes.GONE); + } + + public static boolean isNotFound(CosmosException e) { + return isStatusCode(e, HttpConstants.StatusCodes.NOTFOUND); + } + public static boolean isPartitionSplit(CosmosException e) { return isStatusCode(e, HttpConstants.StatusCodes.GONE) && isSubStatusCode(e, HttpConstants.SubStatusCodes.PARTITION_KEY_RANGE_GONE); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java index 0cb169daac30d..711e970e684e4 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java @@ -855,7 +855,6 @@ public static void setCosmosExceptionAccessor(final CosmosExceptionAccessor newA public interface CosmosExceptionAccessor { CosmosException createCosmosException(int statusCode, Exception innerException); - CosmosException createSerializableCosmosException(CosmosException cosmosException); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index 00aa5cb568596..bda54c73f870a 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -16,7 +16,6 @@ import com.azure.cosmos.implementation.batch.PartitionKeyRangeServerBatchRequest; import com.azure.cosmos.implementation.batch.ServerBatchRequest; import com.azure.cosmos.implementation.batch.SinglePartitionKeyServerBatchRequest; -import com.azure.cosmos.implementation.caches.SizeLimitingLRUCache; import com.azure.cosmos.implementation.caches.RxClientCollectionCache; import com.azure.cosmos.implementation.caches.RxCollectionCache; import com.azure.cosmos.implementation.caches.RxPartitionKeyRangeCache; @@ -39,7 +38,6 @@ import com.azure.cosmos.implementation.query.IDocumentQueryExecutionContext; import com.azure.cosmos.implementation.query.Paginator; import com.azure.cosmos.implementation.query.PartitionedQueryExecutionInfo; -import com.azure.cosmos.implementation.query.PipelinedDocumentQueryExecutionContext; import com.azure.cosmos.implementation.query.PipelinedQueryExecutionContextBase; import com.azure.cosmos.implementation.query.QueryInfo; import com.azure.cosmos.implementation.routing.CollectionRoutingMap; @@ -49,8 +47,8 @@ import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity; import com.azure.cosmos.implementation.routing.Range; import com.azure.cosmos.implementation.spark.OperationContext; -import com.azure.cosmos.implementation.spark.OperationListener; import com.azure.cosmos.implementation.spark.OperationContextAndListenerTuple; +import com.azure.cosmos.implementation.spark.OperationListener; import com.azure.cosmos.implementation.throughputControl.ThroughputControlStore; import com.azure.cosmos.implementation.throughputControl.config.ThroughputControlGroupInternal; import com.azure.cosmos.models.CosmosAuthorizationTokenResolver; @@ -372,7 +370,7 @@ private RxDocumentClientImpl(URI serviceEndpoint, this.retryPolicy = new RetryPolicy(this, this.globalEndpointManager, this.connectionPolicy); this.resetSessionTokenRetryPolicy = retryPolicy; CpuMemoryMonitor.register(this); - this.queryPlanCache = Collections.synchronizedMap(new SizeLimitingLRUCache(Constants.QUERYPLAN_CACHE_SIZE)); + this.queryPlanCache = new ConcurrentHashMap<>(); this.apiType = apiType; } catch (RuntimeException e) { logger.error("unexpected failure in initializing client.", e); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceResponse.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceResponse.java index e780bfee137b6..c8012e1744f9a 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceResponse.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceResponse.java @@ -33,19 +33,11 @@ public class RxDocumentServiceResponse { private CosmosDiagnostics cosmosDiagnostics; public RxDocumentServiceResponse(DiagnosticsClientContext diagnosticsClientContext, StoreResponse response) { - String[] headerNames = response.getResponseHeaderNames(); - String[] headerValues = response.getResponseHeaderValues(); - - this.headersMap = new HashMap<>(headerNames.length); + this.headersMap = new HashMap<>(response.getResponseHeaders()); // Gets status code. this.statusCode = response.getStatus(); - // Extracts headers. - for (int i = 0; i < headerNames.length; i++) { - this.headersMap.put(headerNames[i], headerValues[i]); - } - this.storeResponse = response; this.diagnosticsClientContext = diagnosticsClientContext; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java index b0f74659c8dd2..01db6f9380fb2 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java @@ -8,11 +8,11 @@ import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.implementation.caches.RxClientCollectionCache; import com.azure.cosmos.implementation.caches.RxPartitionKeyRangeCache; -import com.azure.cosmos.implementation.directconnectivity.DirectBridgeInternal; import com.azure.cosmos.implementation.directconnectivity.GatewayServiceConfigurationReader; import com.azure.cosmos.implementation.directconnectivity.HttpUtils; import com.azure.cosmos.implementation.directconnectivity.RequestHelper; import com.azure.cosmos.implementation.directconnectivity.StoreResponse; +import com.azure.cosmos.implementation.directconnectivity.StoreResponseDiagnostics; import com.azure.cosmos.implementation.directconnectivity.WebExceptionUtility; import com.azure.cosmos.implementation.http.HttpClient; import com.azure.cosmos.implementation.http.HttpHeaders; @@ -334,25 +334,23 @@ private Mono toDocumentServiceResponse(Mono { - //Adding transport client request timeline to diagnostics + // Capture transport client request timeline ReactorNettyRequestRecord reactorNettyRequestRecord = httpResponse.request().reactorNettyRequestRecord(); if (reactorNettyRequestRecord != null) { reactorNettyRequestRecord.setTimeCompleted(Instant.now()); - BridgeInternal.setGatewayRequestTimelineOnDiagnostics(request.requestContext.cosmosDiagnostics, - reactorNettyRequestRecord.takeTimelineSnapshot()); } // If there is any error in the header response this throws exception - // TODO: potential performance improvement: return Observable.error(exception) on failure instead of throwing Exception validateOrThrow(request, HttpResponseStatus.valueOf(httpResponseStatus), httpResponseHeaders, content); - // transforms to Observable StoreResponse rsp = new StoreResponse(httpResponseStatus, - HttpUtils.unescape(httpResponseHeaders.toMap().entrySet()), + HttpUtils.unescape(httpResponseHeaders.toMap()), content); - DirectBridgeInternal.setRequestTimeline(rsp, reactorNettyRequestRecord.takeTimelineSnapshot()); + if (reactorNettyRequestRecord != null) { + rsp.setRequestTimeline(reactorNettyRequestRecord.takeTimelineSnapshot()); + } if (request.requestContext.cosmosDiagnostics != null) { - BridgeInternal.recordGatewayResponse(request.requestContext.cosmosDiagnostics, request, rsp, null, globalEndpointManager); + BridgeInternal.recordGatewayResponse(request.requestContext.cosmosDiagnostics, request, rsp, globalEndpointManager); } return rsp; }) @@ -409,13 +407,11 @@ private Mono toDocumentServiceResponse(Mono processMessage(RxDocumentServiceRequest r } if (Exceptions.isThroughputControlRequestRateTooLargeException(dce)) { - BridgeInternal.recordGatewayResponse(request.requestContext.cosmosDiagnostics, request, null, dce, globalEndpointManager); - BridgeInternal.setCosmosDiagnostics(dce, request.requestContext.cosmosDiagnostics); + if (request.requestContext.cosmosDiagnostics != null) { + BridgeInternal.recordGatewayResponse(request.requestContext.cosmosDiagnostics, request, dce, globalEndpointManager); + } } return Mono.error(dce); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/TracerProvider.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/TracerProvider.java index e1a27c8d1b89d..faa7481869515 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/TracerProvider.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/TracerProvider.java @@ -13,7 +13,6 @@ import com.azure.cosmos.CosmosException; import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry; import com.azure.cosmos.implementation.clienttelemetry.ReportPayload; -import com.azure.cosmos.implementation.directconnectivity.DirectBridgeInternal; import com.azure.cosmos.models.CosmosBatchResponse; import com.azure.cosmos.models.CosmosItemResponse; import com.azure.cosmos.models.CosmosResponse; @@ -452,8 +451,7 @@ private void addDiagnosticsOnTracerEvent(CosmosDiagnostics cosmosDiagnostics, Co Iterator eventIterator = null; try { if (storeResponseStatistics.getStoreResult() != null) { - eventIterator = - DirectBridgeInternal.getRequestTimeline(storeResponseStatistics.getStoreResult().toResponse()).iterator(); + eventIterator = storeResponseStatistics.getStoreResult().getStoreResponseDiagnostics().getRequestTimeline().iterator(); } } catch (CosmosException ex) { eventIterator = BridgeInternal.getRequestTimeline(ex).iterator(); @@ -483,8 +481,7 @@ private void addDiagnosticsOnTracerEvent(CosmosDiagnostics cosmosDiagnostics, Co OffsetDateTime requestStartTime = OffsetDateTime.ofInstant(statistics.getRequestResponseTimeUTC(), ZoneOffset.UTC); if (statistics.getStoreResult() != null) { - Iterator eventIterator = - DirectBridgeInternal.getRequestTimeline(statistics.getStoreResult().toResponse()).iterator(); + Iterator eventIterator = statistics.getStoreResult().getStoreResponseDiagnostics().getRequestTimeline().iterator(); while (eventIterator.hasNext()) { RequestTimeline.Event event = eventIterator.next(); if (event.getName().equals("created")) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/cpu/CpuMemoryReader.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/cpu/CpuMemoryReader.java index 93e16e758c189..4e4bce33889d6 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/cpu/CpuMemoryReader.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/cpu/CpuMemoryReader.java @@ -20,6 +20,9 @@ public CpuMemoryReader() { ManagementFactory.getOperatingSystemMXBean(); } catch (Throwable t) { logger.error("failed to initialized CpuMemoryReader", t); + if (t instanceof Error) { + throw (Error) t; + } } this.operatingSystemMXBean = tryGetAs(mxBean, @@ -38,6 +41,9 @@ public float getSystemWideCpuUsage() { return Float.NaN; } catch (Throwable t) { logger.error("Failed to get System CPU", t); + if (t instanceof Error) { + throw (Error) t; + } return Float.NaN; } } @@ -50,6 +56,9 @@ public long getSystemWideMemoryUsage() { return maxMemory -(totalMemory - freeMemory); } catch (Throwable t) { logger.error("Failed to get System memory", t); + if (t instanceof Error) { + throw (Error) t; + } return 0; } } @@ -59,6 +68,9 @@ private T tryGetAs(java.lang.management.OperatingSystemMXBean mxBean, Class< return Utils.as(mxBean, classType); } catch (Throwable t) { logger.error("failed to initialized CpuMemoryReader as type {}", classType.getName(), t); + if (t instanceof Error) { + throw (Error) t; + } return null; } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java index 6d0363e9ca009..dc80fbc5994fe 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java @@ -120,6 +120,9 @@ public Mono writeAsync( SessionTokenHelper.setOriginalSessionToken(entity, sessionToken); } catch (Throwable throwable) { logger.error("Unexpected failure in handling orig [{}]: new [{}]", arg, throwable.getMessage(), throwable); + if (throwable instanceof Error) { + throw (Error) throwable; + } } } ); @@ -202,6 +205,9 @@ Mono writePrivateAsync( } catch (Throwable throwable) { logger.error("Unexpected failure in handling orig [{}]", t.getMessage(), t); logger.error("Unexpected failure in handling orig [{}] : new [{}]", t.getMessage(), throwable.getMessage(), throwable); + if (throwable instanceof Error) { + throw (Error) throwable; + } } } ); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/DirectBridgeInternal.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/DirectBridgeInternal.java deleted file mode 100644 index f5f5cb1e4e6d4..0000000000000 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/DirectBridgeInternal.java +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.azure.cosmos.implementation.directconnectivity; - -import com.azure.cosmos.implementation.RequestTimeline; - -/** - * This is meant to be used only internally as a bridge access to classes in - * com.azure.cosmos.implementation.directconnectivity - **/ -public class DirectBridgeInternal { - - public static int getSubStatusCode(StoreResponse storeResponse) { - return storeResponse.getSubStatusCode(); - } - - public static RequestTimeline getRequestTimeline(StoreResponse storeResponse) { - return storeResponse.getRequestTimeline(); - } - - public static void setRequestTimeline(StoreResponse storeResponse, RequestTimeline requestTimeline) { - storeResponse.setRequestTimeline(requestTimeline); - } -} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCache.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCache.java index af153a87af13d..b7ca904b9c714 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCache.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCache.java @@ -448,10 +448,7 @@ public Mono> getServerAddressesViaGatewayAsync( } if (request.requestContext.cosmosDiagnostics != null) { - BridgeInternal.recordGatewayResponse(request.requestContext.cosmosDiagnostics, request, null, - dce, this.globalEndpointManager); - BridgeInternal.setCosmosDiagnostics(dce, - request.requestContext.cosmosDiagnostics); + BridgeInternal.recordGatewayResponse(request.requestContext.cosmosDiagnostics, request, dce, this.globalEndpointManager); } return Mono.error(dce); @@ -773,10 +770,7 @@ public Mono> getMasterAddressesViaGatewayAsync( } if (request.requestContext.cosmosDiagnostics != null) { - BridgeInternal.recordGatewayResponse(request.requestContext.cosmosDiagnostics, request, null, - dce, this.globalEndpointManager); - BridgeInternal.setCosmosDiagnostics(dce, - request.requestContext.cosmosDiagnostics); + BridgeInternal.recordGatewayResponse(request.requestContext.cosmosDiagnostics, request, dce, this.globalEndpointManager); } return Mono.error(dce); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/HttpUtils.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/HttpUtils.java index c5c4d5cfbc0ad..2e5d27ac03dd5 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/HttpUtils.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/HttpUtils.java @@ -90,4 +90,12 @@ public static List> unescape(Set> he } return result; } + + public static Map unescape(Map headers) { + if (headers != null) { + headers.computeIfPresent(HttpConstants.HttpHeaders.OWNER_FULL_NAME, + (ownerKey, ownerValue) -> HttpUtils.urlDecode(ownerValue)); + } + return headers; + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ResponseUtils.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ResponseUtils.java index 645a4f2c97984..5a65c97516493 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ResponseUtils.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ResponseUtils.java @@ -20,7 +20,7 @@ static Mono toStoreResponse(HttpResponse httpClientResponse, Http return contentObservable.map(byteArrayContent -> { // transforms to Mono - return new StoreResponse(httpClientResponse.statusCode(), HttpUtils.unescape(httpResponseHeaders.toMap().entrySet()), byteArrayContent); + return new StoreResponse(httpClientResponse.statusCode(), HttpUtils.unescape(httpResponseHeaders.toMap()), byteArrayContent); }); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreClient.java index 3554f2a93b204..24f6db2244884 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreClient.java @@ -112,6 +112,9 @@ public Mono processMessageAsync(RxDocumentServiceRequ } catch (Throwable throwable) { logger.error("Unexpected failure in handling orig [{}]", e.getMessage(), e); logger.error("Unexpected failure in handling orig [{}] : new [{}]", e.getMessage(), throwable.getMessage(), throwable); + if (throwable instanceof Error) { + throw (Error) throwable; + } } } ); @@ -138,20 +141,11 @@ private void handleUnsuccessfulStoreResponse(RxDocumentServiceRequest request, C private RxDocumentServiceResponse completeResponse( StoreResponse storeResponse, RxDocumentServiceRequest request) throws InternalServerErrorException { - if (storeResponse.getResponseHeaderNames().length != storeResponse.getResponseHeaderValues().length) { - throw new InternalServerErrorException(RMResources.InvalidBackendResponse); - } - Map headers = new HashMap<>(storeResponse.getResponseHeaderNames().length); - for (int idx = 0; idx < storeResponse.getResponseHeaderNames().length; idx++) { - String name = storeResponse.getResponseHeaderNames()[idx]; - String value = storeResponse.getResponseHeaderValues()[idx]; - - headers.put(name, value); - } + Map responseHeaders = new HashMap<>(storeResponse.getResponseHeaders()); - this.updateResponseHeader(request, headers); - this.captureSessionToken(request, headers); + this.updateResponseHeader(request, responseHeaders); + this.captureSessionToken(request, responseHeaders); BridgeInternal.recordRetryContextEndTime(request.requestContext.cosmosDiagnostics); RxDocumentServiceResponse rxDocumentServiceResponse = new RxDocumentServiceResponse(this.diagnosticsClientContext, storeResponse); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java index 5f561f09a39b7..ba3a7b5508ca5 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java @@ -258,7 +258,7 @@ private Flux> readFromReplicas(List resultCollect } } catch (Exception e) { - // TODO: what to do on exception? + logger.error("Error occurred while adding store results to resultCollector", e); } } @@ -485,6 +485,9 @@ public Mono readPrimaryAsync( SessionTokenHelper.setOriginalSessionToken(entity, originalSessionToken); } catch (Throwable throwable) { logger.error("Unexpected failure in handling orig [{}]: new [{}]", arg, throwable.getMessage(), throwable); + if (throwable instanceof Error) { + throw (Error) throwable; + } } } ); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java index 4d312c6560889..59d6049260ca9 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java @@ -11,8 +11,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; -import java.util.Map.Entry; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** * Used internally to represents a response from the store. @@ -20,8 +20,7 @@ public class StoreResponse { final static Logger LOGGER = LoggerFactory.getLogger(StoreResponse.class); final private int status; - final private String[] responseHeaderNames; - final private String[] responseHeaderValues; + final private Map responseHeaders; final private byte[] content; private int pendingRequestQueueSize; @@ -36,20 +35,11 @@ public class StoreResponse { public StoreResponse( int status, - List> headerEntries, + Map headerMap, byte[] content) { requestTimeline = RequestTimeline.empty(); - responseHeaderNames = new String[headerEntries.size()]; - responseHeaderValues = new String[headerEntries.size()]; - - int i = 0; - - for(Entry headerEntry: headerEntries) { - responseHeaderNames[i] = headerEntry.getKey(); - responseHeaderValues[i] = headerEntry.getValue(); - i++; - } + responseHeaders = new ConcurrentHashMap<>(headerMap); this.status = status; this.content = content; @@ -62,12 +52,8 @@ public int getStatus() { return status; } - public String[] getResponseHeaderNames() { - return responseHeaderNames; - } - - public String[] getResponseHeaderValues() { - return responseHeaderValues; + public Map getResponseHeaders() { + return responseHeaders; } public int getRntbdChannelTaskQueueSize() { @@ -131,22 +117,16 @@ public String getPartitionKeyRangeId() { return this.getHeaderValue(WFConstants.BackendHeaders.PARTITION_KEY_RANGE_ID); } - public String getContinuation() { - return this.getHeaderValue(HttpConstants.HttpHeaders.CONTINUATION); + public String getActivityId() { + return this.getHeaderValue(HttpConstants.HttpHeaders.ACTIVITY_ID); } - public String getHeaderValue(String attribute) { - if (this.responseHeaderValues == null || this.responseHeaderNames.length != this.responseHeaderValues.length) { - return null; - } - - for (int i = 0; i < responseHeaderNames.length; i++) { - if (responseHeaderNames[i].equalsIgnoreCase(attribute)) { - return responseHeaderValues[i]; - } - } + public String getCorrelatedActivityId() { + return this.getHeaderValue(HttpConstants.HttpHeaders.CORRELATED_ACTIVITY_ID); + } - return null; + public String getHeaderValue(String attribute) { + return responseHeaders.get(attribute); } public double getRequestCharge() { @@ -157,19 +137,11 @@ public double getRequestCharge() { return Double.parseDouble(value); } - /** - * Static factory method to create serializable store response to be used in CosmosDiagnostics - * @param storeResponse store response - * @return serializable store response - */ - public static StoreResponse createSerializableStoreResponse(StoreResponse storeResponse) { - if (storeResponse == null) { - return null; - } - return new StoreResponse(storeResponse); + public String getSessionTokenString() { + return this.getHeaderValue(HttpConstants.HttpHeaders.SESSION_TOKEN); } - void setRequestTimeline(RequestTimeline requestTimeline) { + public void setRequestTimeline(RequestTimeline requestTimeline) { this.requestTimeline = requestTimeline; } @@ -177,7 +149,7 @@ RequestTimeline getRequestTimeline() { return this.requestTimeline; } - void setChannelAcquisitionTimeline(RntbdChannelAcquisitionTimeline channelAcquisitionTimeline) { + public void setChannelAcquisitionTimeline(RntbdChannelAcquisitionTimeline channelAcquisitionTimeline) { this.channelAcquisitionTimeline = channelAcquisitionTimeline; } @@ -185,11 +157,11 @@ RntbdChannelAcquisitionTimeline getChannelAcquisitionTimeline() { return this.channelAcquisitionTimeline; } - void setEndpointStatistics(RntbdEndpointStatistics rntbdEndpointStatistics) { + public void setEndpointStatistics(RntbdEndpointStatistics rntbdEndpointStatistics) { this.rntbdEndpointStatistics = rntbdEndpointStatistics; } - RntbdEndpointStatistics getEndpointStsts() { + RntbdEndpointStatistics getEndpointStatistics() { return this.rntbdEndpointStatistics; } @@ -205,25 +177,4 @@ int getSubStatusCode() { } return subStatusCode; } - - /** - * Private copy constructor, only to be used internally for serialization purposes - *

- * NOTE: This constructor does not copy all the fields to avoid memory issues. - */ - private StoreResponse(StoreResponse storeResponse) { - this.responseHeaderValues = null; - this.responseHeaderNames = null; - this.content = null; - this.status = storeResponse.status; - this.pendingRequestQueueSize = storeResponse.pendingRequestQueueSize; - this.requestPayloadLength = storeResponse.requestPayloadLength; - this.responsePayloadLength = storeResponse.responsePayloadLength; - this.requestTimeline = storeResponse.requestTimeline; - this.channelAcquisitionTimeline = storeResponse.channelAcquisitionTimeline; - this.rntbdChannelTaskQueueSize = storeResponse.rntbdChannelTaskQueueSize; - this.rntbdEndpointStatistics = storeResponse.rntbdEndpointStatistics; - this.rntbdRequestLength = storeResponse.rntbdRequestLength; - this.rntbdResponseLength = storeResponse.rntbdResponseLength; - } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponseDiagnostics.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponseDiagnostics.java new file mode 100644 index 0000000000000..0ecbe8aabce6f --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponseDiagnostics.java @@ -0,0 +1,160 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.directconnectivity; + +import com.azure.cosmos.BridgeInternal; +import com.azure.cosmos.CosmosException; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.RequestTimeline; +import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdChannelAcquisitionTimeline; +import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpointStatistics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This represents diagnostics from store response OR from cosmos exception + */ +public class StoreResponseDiagnostics { + final static Logger logger = LoggerFactory.getLogger(StoreResponseDiagnostics.class); + private final String partitionKeyRangeId; + private final String sessionTokenAsString; + private final double requestCharge; + private final String activityId; + private final String correlatedActivityId; + private final int statusCode; + private final int subStatusCode; + private final int pendingRequestQueueSize; + private final int requestPayloadLength; + private final int responsePayloadLength; + private final RequestTimeline requestTimeline; + private final RntbdChannelAcquisitionTimeline channelAcquisitionTimeline; + private final int rntbdChannelTaskQueueSize; + private final RntbdEndpointStatistics rntbdEndpointStatistics; + private final int rntbdRequestLength; + private final int rntbdResponseLength; + private final String exceptionMessage; + private final String exceptionResponseHeaders; + + public static StoreResponseDiagnostics createStoreResponseDiagnostics(StoreResponse storeResponse) { + return new StoreResponseDiagnostics(storeResponse); + } + + public static StoreResponseDiagnostics createStoreResponseDiagnostics(CosmosException cosmosException) { + return new StoreResponseDiagnostics(cosmosException); + } + + private StoreResponseDiagnostics(StoreResponse storeResponse) { + this.partitionKeyRangeId = storeResponse.getPartitionKeyRangeId(); + this.activityId = storeResponse.getActivityId(); + this.correlatedActivityId = storeResponse.getCorrelatedActivityId(); + this.requestCharge = storeResponse.getRequestCharge(); + this.sessionTokenAsString = storeResponse.getSessionTokenString(); + this.statusCode = storeResponse.getStatus(); + this.subStatusCode = storeResponse.getSubStatusCode(); + this.pendingRequestQueueSize = storeResponse.getPendingRequestQueueSize(); + this.requestPayloadLength = storeResponse.getRequestPayloadLength(); + this.responsePayloadLength = storeResponse.getResponseBodyLength(); + this.requestTimeline = storeResponse.getRequestTimeline(); + this.channelAcquisitionTimeline = storeResponse.getChannelAcquisitionTimeline(); + this.rntbdChannelTaskQueueSize = storeResponse.getRntbdChannelTaskQueueSize(); + this.rntbdEndpointStatistics = storeResponse.getEndpointStatistics(); + this.rntbdRequestLength = storeResponse.getRntbdRequestLength(); + this.rntbdResponseLength = storeResponse.getRntbdResponseLength(); + this.exceptionMessage = null; + this.exceptionResponseHeaders = null; + } + + private StoreResponseDiagnostics(CosmosException e) { + this.partitionKeyRangeId = BridgeInternal.getPartitionKeyRangeId(e); + this.activityId = e.getActivityId(); + this.correlatedActivityId = e.getResponseHeaders().get(HttpConstants.HttpHeaders.CORRELATED_ACTIVITY_ID);; + this.requestCharge = e.getRequestCharge(); + this.sessionTokenAsString = e.getResponseHeaders().get(HttpConstants.HttpHeaders.SESSION_TOKEN); + this.statusCode = e.getStatusCode(); + this.subStatusCode = e.getSubStatusCode(); + this.pendingRequestQueueSize = BridgeInternal.getRntbdPendingRequestQueueSize(e); + this.requestPayloadLength = BridgeInternal.getRequestBodyLength(e); + this.responsePayloadLength = BridgeInternal.getRntbdResponseLength(e); + this.requestTimeline = BridgeInternal.getRequestTimeline(e); + this.channelAcquisitionTimeline = BridgeInternal.getChannelAcqusitionTimeline(e); + this.rntbdChannelTaskQueueSize = BridgeInternal.getChannelTaskQueueSize(e); + this.rntbdEndpointStatistics = BridgeInternal.getServiceEndpointStatistics(e); + this.rntbdRequestLength = BridgeInternal.getRntbdRequestLength(e); + this.rntbdResponseLength = BridgeInternal.getRntbdResponseLength(e); + this.exceptionMessage = BridgeInternal.getInnerErrorMessage(e); + this.exceptionResponseHeaders = e.getResponseHeaders() != null ? e.getResponseHeaders().toString() : null; + } + + public int getStatusCode() { + return statusCode; + } + + public int getSubStatusCode() { + return subStatusCode; + } + + public int getPendingRequestQueueSize() { + return pendingRequestQueueSize; + } + + public int getRequestPayloadLength() { + return requestPayloadLength; + } + + public int getResponsePayloadLength() { + return responsePayloadLength; + } + + public RequestTimeline getRequestTimeline() { + return requestTimeline; + } + + public RntbdChannelAcquisitionTimeline getChannelAcquisitionTimeline() { + return channelAcquisitionTimeline; + } + + public int getRntbdChannelTaskQueueSize() { + return rntbdChannelTaskQueueSize; + } + + public RntbdEndpointStatistics getRntbdEndpointStatistics() { + return rntbdEndpointStatistics; + } + + public int getRntbdRequestLength() { + return rntbdRequestLength; + } + + public int getRntbdResponseLength() { + return rntbdResponseLength; + } + + public String getExceptionMessage() { + return exceptionMessage; + } + + public String getPartitionKeyRangeId() { + return partitionKeyRangeId; + } + + public String getSessionTokenAsString() { + return sessionTokenAsString; + } + + public double getRequestCharge() { + return requestCharge; + } + + public String getActivityId() { + return activityId; + } + + public String getCorrelatedActivityId() { + return correlatedActivityId; + } + + public String getExceptionResponseHeaders() { + return exceptionResponseHeaders; + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResult.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResult.java index 2dc8399cc1480..3184d50b60d50 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResult.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResult.java @@ -3,24 +3,16 @@ package com.azure.cosmos.implementation.directconnectivity; -import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.CosmosException; import com.azure.cosmos.implementation.Exceptions; import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.ISessionToken; -import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.InternalServerErrorException; import com.azure.cosmos.implementation.RMResources; import com.azure.cosmos.implementation.RequestChargeTracker; -import com.azure.cosmos.implementation.Strings; -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.databind.SerializerProvider; -import com.fasterxml.jackson.databind.ser.std.StdSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; - public class StoreResult { private final static Logger logger = LoggerFactory.getLogger(StoreResult.class); @@ -34,7 +26,6 @@ public class StoreResult { final public long numberOfReadRegions; final public long itemLSN; final public ISessionToken sessionToken; - final public String sessionTokenAsString; final public double requestCharge; final public String activityId; final public String correlatedActivityId; @@ -45,7 +36,6 @@ public class StoreResult { final public boolean isNotFoundException; final public boolean isInvalidPartitionException; final public Uri storePhysicalAddress; - final public String storePhysicalAddressAsString; final public boolean isThroughputControlRequestRateTooLargeException; final public Double backendLatencyInMs; @@ -83,16 +73,18 @@ public StoreResult( this.isInvalidPartitionException = this.exception != null && Exceptions.isNameCacheStale(this.exception); this.storePhysicalAddress = storePhysicalAddress; - this.storePhysicalAddressAsString = storePhysicalAddress != null ? storePhysicalAddress.getURIAsString() : null; this.globalCommittedLSN = globalCommittedLSN; this.numberOfReadRegions = numberOfReadRegions; this.itemLSN = itemLSN; this.sessionToken = sessionToken; - this.sessionTokenAsString = sessionToken != null ? sessionToken.convertToString() : null; this.isThroughputControlRequestRateTooLargeException = this.exception != null && Exceptions.isThroughputControlRequestRateTooLargeException(this.exception); this.backendLatencyInMs = backendLatencyInMs; } + public StoreResponse getStoreResponse() { + return storeResponse; + } + public CosmosException getException() throws InternalServerErrorException { if (this.exception == null) { String message = "Exception should be available but found none"; @@ -130,174 +122,12 @@ public StoreResponse toResponse(RequestChargeTracker requestChargeTracker) { } private static void setRequestCharge(StoreResponse response, CosmosException cosmosException, double totalRequestCharge) { + String totalRequestChargeString = Double.toString(totalRequestCharge); if (cosmosException != null) { - cosmosException.getResponseHeaders().put(HttpConstants.HttpHeaders.REQUEST_CHARGE, - Double.toString(totalRequestCharge)); - } - // Set total charge as final charge for the response. - else if (response.getResponseHeaderNames() != null) { - for (int i = 0; i < response.getResponseHeaderNames().length; ++i) { - if (Strings.areEqualIgnoreCase( - response.getResponseHeaderNames()[i], - HttpConstants.HttpHeaders.REQUEST_CHARGE)) { - response.getResponseHeaderValues()[i] = Double.toString(totalRequestCharge); - break; - } - } + cosmosException.getResponseHeaders().put(HttpConstants.HttpHeaders.REQUEST_CHARGE, totalRequestChargeString); + } else { + // Set total charge as final charge for the response. + response.getResponseHeaders().put(HttpConstants.HttpHeaders.REQUEST_CHARGE, totalRequestChargeString); } } - - @Override - public String toString() { - int statusCode = 0; - int subStatusCode = HttpConstants.SubStatusCodes.UNKNOWN; - - if (this.storeResponse != null) { - statusCode = this.storeResponse.getStatus(); - subStatusCode = this.storeResponse.getSubStatusCode(); - } else if (this.exception != null) { - statusCode = this.exception.getStatusCode(); - subStatusCode = this.exception.getSubStatusCode(); - } - - return "storePhysicalAddress: " + this.storePhysicalAddressAsString + - ", lsn: " + this.lsn + - ", globalCommittedLsn: " + this.globalCommittedLSN + - ", partitionKeyRangeId: " + this.partitionKeyRangeId + - ", isValid: " + this.isValid + - ", statusCode: " + statusCode + - ", subStatusCode: " + subStatusCode + - ", isGone: " + this.isGoneException + - ", isNotFound: " + this.isNotFoundException + - ", isThroughputControlRequestRateTooLarge: " + this.isThroughputControlRequestRateTooLargeException + - ", isInvalidPartition: " + this.isInvalidPartitionException + - ", requestCharge: " + this.requestCharge + - ", itemLSN: " + this.itemLSN + - ", sessionToken: " + this.sessionTokenAsString + - ", backendLatencyInMs: " + this.backendLatencyInMs + - ", exception: " + BridgeInternal.getInnerErrorMessage(this.exception); - } - - /** - * Static factory method to create serializable store result to be used in CosmosDiagnostics - * @param storeResult store result - * @return serializable store result - */ - public static StoreResult createSerializableStoreResult(StoreResult storeResult) { - if (storeResult == null) { - return null; - } - return new StoreResult(storeResult); - } - - public static class StoreResultSerializer extends StdSerializer { - private static final long serialVersionUID = 5315472126043077905L; - - public StoreResultSerializer(){ - super(StoreResult.class); - } - - @Override - public void serialize(StoreResult storeResult, - JsonGenerator jsonGenerator, - SerializerProvider serializerProvider) throws IOException { - int statusCode = 0; - int subStatusCode = HttpConstants.SubStatusCodes.UNKNOWN; - - if (storeResult.storeResponse != null) { - statusCode = storeResult.storeResponse.getStatus(); - subStatusCode = storeResult.storeResponse.getSubStatusCode(); - } else if (storeResult.exception != null) { - statusCode = storeResult.exception.getStatusCode(); - subStatusCode = storeResult.exception.getSubStatusCode(); - } - jsonGenerator.writeStartObject(); - jsonGenerator.writeObjectField("storePhysicalAddress", storeResult.storePhysicalAddressAsString); - jsonGenerator.writeNumberField("lsn", storeResult.lsn); - jsonGenerator.writeNumberField("globalCommittedLsn", storeResult.globalCommittedLSN); - jsonGenerator.writeStringField("partitionKeyRangeId", storeResult.partitionKeyRangeId); - jsonGenerator.writeBooleanField("isValid", storeResult.isValid); - jsonGenerator.writeNumberField("statusCode", statusCode); - jsonGenerator.writeNumberField("subStatusCode", subStatusCode); - jsonGenerator.writeBooleanField("isGone", storeResult.isGoneException); - jsonGenerator.writeBooleanField("isNotFound", storeResult.isNotFoundException); - jsonGenerator.writeBooleanField("isInvalidPartition", storeResult.isInvalidPartitionException); - jsonGenerator.writeBooleanField("isThroughputControlRequestRateTooLarge", storeResult.isThroughputControlRequestRateTooLargeException); - jsonGenerator.writeNumberField("requestCharge", storeResult.requestCharge); - jsonGenerator.writeNumberField("itemLSN", storeResult.itemLSN); - jsonGenerator.writeStringField("sessionToken", storeResult.sessionTokenAsString); - jsonGenerator.writeObjectField("backendLatencyInMs", storeResult.backendLatencyInMs); - jsonGenerator.writeStringField("exception", BridgeInternal.getInnerErrorMessage(storeResult.exception)); - jsonGenerator.writeObjectField("transportRequestTimeline", storeResult.storeResponse != null ? - storeResult.storeResponse.getRequestTimeline() : - storeResult.exception != null ? BridgeInternal.getRequestTimeline(storeResult.exception) : null); - - this.writeNonNullObjectField( - jsonGenerator, - "transportRequestChannelAcquisitionContext", - storeResult.storeResponse != null ? storeResult.storeResponse.getChannelAcquisitionTimeline() : - storeResult.exception != null? BridgeInternal.getChannelAcqusitionTimeline(storeResult.exception) : null); - - jsonGenerator.writeObjectField("rntbdRequestLengthInBytes", storeResult.storeResponse != null ? - storeResult.storeResponse.getRntbdRequestLength() : BridgeInternal.getRntbdRequestLength(storeResult.exception)); - jsonGenerator.writeObjectField("rntbdResponseLengthInBytes", storeResult.storeResponse != null ? - storeResult.storeResponse.getRntbdResponseLength() : BridgeInternal.getRntbdResponseLength(storeResult.exception)); - jsonGenerator.writeObjectField("requestPayloadLengthInBytes", storeResult.storeResponse != null ? - storeResult.storeResponse.getRequestPayloadLength() : BridgeInternal.getRequestBodyLength(storeResult.exception)); - jsonGenerator.writeObjectField("responsePayloadLengthInBytes", storeResult.storeResponse != null ? - storeResult.storeResponse.getResponseBodyLength() : null); - jsonGenerator.writeObjectField("channelTaskQueueSize", storeResult.storeResponse != null ? storeResult.storeResponse.getRntbdChannelTaskQueueSize() : - BridgeInternal.getChannelTaskQueueSize(storeResult.exception)); - jsonGenerator.writeObjectField("pendingRequestsCount", storeResult.storeResponse != null ? storeResult.storeResponse.getPendingRequestQueueSize() : - BridgeInternal.getRntbdPendingRequestQueueSize(storeResult.exception)); - jsonGenerator.writeObjectField("serviceEndpointStatistics", storeResult.storeResponse != null ? storeResult.storeResponse.getEndpointStsts() : - storeResult.exception != null ? BridgeInternal.getServiceEndpointStatistics(storeResult.exception) : null); - - jsonGenerator.writeEndObject(); - } - - private void writeNonNullObjectField(JsonGenerator jsonGenerator, String fieldName, Object object) throws IOException { - if (object == null) { - return; - } - - jsonGenerator.writeObjectField(fieldName, object); - } - } - - /** - * Private copy constructor, only to be used internally for serialization purposes - *

- * NOTE: This constructor does not copy all the fields to avoid memory issues. - */ - private StoreResult(StoreResult storeResult) { - this.storeResponse = StoreResponse.createSerializableStoreResponse(storeResult.storeResponse); - this.exception = ImplementationBridgeHelpers - .CosmosExceptionHelper - .getCosmosExceptionAccessor() - .createSerializableCosmosException(storeResult.exception); - this.lsn = storeResult.lsn; - this.partitionKeyRangeId = storeResult.partitionKeyRangeId; - this.quorumAckedLSN = storeResult.quorumAckedLSN; - this.globalCommittedLSN = storeResult.globalCommittedLSN; - this.numberOfReadRegions = storeResult.numberOfReadRegions; - this.itemLSN = storeResult.itemLSN; - // Just saving the string representation and nulling out the ISessionToken - this.sessionToken = null; - this.sessionTokenAsString = storeResult.sessionTokenAsString; - this.requestCharge = storeResult.requestCharge; - this.activityId = storeResult.activityId; - this.correlatedActivityId = storeResult.correlatedActivityId; - this.currentReplicaSetSize = storeResult.currentReplicaSetSize; - this.currentWriteQuorum = storeResult.currentWriteQuorum; - this.isValid = storeResult.isValid; - this.isGoneException = storeResult.isGoneException; - this.isNotFoundException = storeResult.isNotFoundException; - this.isInvalidPartitionException = storeResult.isInvalidPartitionException; - // Just saving the string representation and nulling out the URI - this.storePhysicalAddress = null; - this.storePhysicalAddressAsString = storeResult.storePhysicalAddressAsString; - this.isThroughputControlRequestRateTooLargeException = storeResult.isThroughputControlRequestRateTooLargeException; - this.backendLatencyInMs = storeResult.backendLatencyInMs; - } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResultDiagnostics.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResultDiagnostics.java new file mode 100644 index 0000000000000..53a1075ae772b --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResultDiagnostics.java @@ -0,0 +1,203 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.directconnectivity; + +import com.azure.cosmos.CosmosException; +import com.azure.cosmos.implementation.Exceptions; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.ser.std.StdSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * StoreResultDiagnostics is a combination of diagnostics from StoreResult, StoreResponse and CosmosException. + * + * This is just a model class for StoreResult Diagnostics. It doesn't contain any references to the actual store result, store response and cosmos exception. + * We intend to keep it this way - decoupled with store result, store response and cosmos exception. + * + */ +public class StoreResultDiagnostics { + private final static Logger logger = LoggerFactory.getLogger(StoreResultDiagnostics.class); + + // StoreResult fields + private final long lsn; + private final long quorumAckedLSN; + private final long globalCommittedLSN; + private final long numberOfReadRegions; + private final long itemLSN; + private final int currentReplicaSetSize; + private final int currentWriteQuorum; + private final boolean isValid; + private boolean isGoneException; + private boolean isNotFoundException; + private boolean isInvalidPartitionException; + private final String storePhysicalAddressAsString; + private boolean isThroughputControlRequestRateTooLargeException; + private final Double backendLatencyInMs; + + // StoreResponse and CosmosException fields + private StoreResponseDiagnostics storeResponseDiagnostics; + + public static StoreResultDiagnostics createStoreResultDiagnostics(StoreResult storeResult) { + if (storeResult == null) { + return null; + } else if (storeResult.getStoreResponse() != null) { + return new StoreResultDiagnostics(storeResult, storeResult.getStoreResponse()); + } else { + return new StoreResultDiagnostics(storeResult, storeResult.getException()); + } + } + + private StoreResultDiagnostics(StoreResult storeResult) { + this.lsn = storeResult.lsn; + this.quorumAckedLSN = storeResult.quorumAckedLSN; + this.currentReplicaSetSize = storeResult.currentReplicaSetSize; + this.currentWriteQuorum = storeResult.currentWriteQuorum; + this.isValid = storeResult.isValid; + this.storePhysicalAddressAsString = storeResult.storePhysicalAddress != null ? storeResult.storePhysicalAddress.getURIAsString() : null; + this.globalCommittedLSN = storeResult.globalCommittedLSN; + this.numberOfReadRegions = storeResult.numberOfReadRegions; + this.itemLSN = storeResult.itemLSN; + this.backendLatencyInMs = storeResult.backendLatencyInMs; + } + + private StoreResultDiagnostics(StoreResult storeResult, CosmosException e) { + this(storeResult); + this.isGoneException = Exceptions.isGone(e); + this.isNotFoundException = Exceptions.isNotFound(e); + this.isInvalidPartitionException = Exceptions.isNameCacheStale(e); + this.isThroughputControlRequestRateTooLargeException = Exceptions.isThroughputControlRequestRateTooLargeException(e); + this.storeResponseDiagnostics = StoreResponseDiagnostics.createStoreResponseDiagnostics(e); + } + + private StoreResultDiagnostics(StoreResult storeResult, StoreResponse storeResponse) { + this(storeResult); + this.storeResponseDiagnostics = StoreResponseDiagnostics.createStoreResponseDiagnostics(storeResponse); + } + + public long getLsn() { + return lsn; + } + + public long getQuorumAckedLSN() { + return quorumAckedLSN; + } + + public long getGlobalCommittedLSN() { + return globalCommittedLSN; + } + + public long getNumberOfReadRegions() { + return numberOfReadRegions; + } + + public long getItemLSN() { + return itemLSN; + } + + public int getCurrentReplicaSetSize() { + return currentReplicaSetSize; + } + + public int getCurrentWriteQuorum() { + return currentWriteQuorum; + } + + public boolean isValid() { + return isValid; + } + + public boolean isGoneException() { + return isGoneException; + } + + public boolean isNotFoundException() { + return isNotFoundException; + } + + public boolean isInvalidPartitionException() { + return isInvalidPartitionException; + } + + public String getStorePhysicalAddressAsString() { + return storePhysicalAddressAsString; + } + + public boolean isThroughputControlRequestRateTooLargeException() { + return isThroughputControlRequestRateTooLargeException; + } + + public Double getBackendLatencyInMs() { + return backendLatencyInMs; + } + + public StoreResponseDiagnostics getStoreResponseDiagnostics() { + return storeResponseDiagnostics; + } + + public static class StoreResultDiagnosticsSerializer extends StdSerializer { + private static final long serialVersionUID = 5315472126043077905L; + + public StoreResultDiagnosticsSerializer(){ + super(StoreResultDiagnostics.class); + } + + @Override + public void serialize(StoreResultDiagnostics storeResultDiagnostics, + JsonGenerator jsonGenerator, + SerializerProvider serializerProvider) throws IOException { + StoreResponseDiagnostics storeResponseDiagnostics = storeResultDiagnostics.getStoreResponseDiagnostics(); + jsonGenerator.writeStartObject(); + jsonGenerator.writeStringField("storePhysicalAddress", storeResultDiagnostics.storePhysicalAddressAsString); + jsonGenerator.writeNumberField("lsn", storeResultDiagnostics.lsn); + jsonGenerator.writeNumberField("globalCommittedLsn", storeResultDiagnostics.globalCommittedLSN); + jsonGenerator.writeStringField("partitionKeyRangeId", storeResponseDiagnostics.getPartitionKeyRangeId()); + jsonGenerator.writeBooleanField("isValid", storeResultDiagnostics.isValid); + jsonGenerator.writeNumberField("statusCode", storeResponseDiagnostics.getStatusCode()); + jsonGenerator.writeNumberField("subStatusCode", storeResponseDiagnostics.getSubStatusCode()); + jsonGenerator.writeBooleanField("isGone", storeResultDiagnostics.isGoneException); + jsonGenerator.writeBooleanField("isNotFound", storeResultDiagnostics.isNotFoundException); + jsonGenerator.writeBooleanField("isInvalidPartition", storeResultDiagnostics.isInvalidPartitionException); + jsonGenerator.writeBooleanField("isThroughputControlRequestRateTooLarge", storeResultDiagnostics.isThroughputControlRequestRateTooLargeException); + jsonGenerator.writeNumberField("requestCharge", storeResponseDiagnostics.getRequestCharge()); + jsonGenerator.writeNumberField("itemLSN", storeResultDiagnostics.itemLSN); + jsonGenerator.writeStringField("sessionToken", storeResponseDiagnostics.getSessionTokenAsString()); + jsonGenerator.writeObjectField("backendLatencyInMs", storeResultDiagnostics.backendLatencyInMs); + this.writeNonNullStringField(jsonGenerator, "exceptionMessage", storeResponseDiagnostics.getExceptionMessage()); + this.writeNonNullStringField(jsonGenerator, "exceptionResponseHeaders", storeResponseDiagnostics.getExceptionResponseHeaders()); + jsonGenerator.writeObjectField("transportRequestTimeline", storeResponseDiagnostics.getRequestTimeline()); + + this.writeNonNullObjectField(jsonGenerator,"transportRequestChannelAcquisitionContext", storeResponseDiagnostics.getChannelAcquisitionTimeline()); + + jsonGenerator.writeObjectField("rntbdRequestLengthInBytes", storeResponseDiagnostics.getRntbdRequestLength()); + jsonGenerator.writeObjectField("rntbdResponseLengthInBytes", storeResponseDiagnostics.getRntbdResponseLength()); + jsonGenerator.writeObjectField("requestPayloadLengthInBytes", storeResponseDiagnostics.getRequestPayloadLength()); + jsonGenerator.writeObjectField("responsePayloadLengthInBytes", storeResponseDiagnostics.getResponsePayloadLength()); + jsonGenerator.writeObjectField("channelTaskQueueSize", storeResponseDiagnostics.getRntbdChannelTaskQueueSize()); + jsonGenerator.writeObjectField("pendingRequestsCount", storeResponseDiagnostics.getPendingRequestQueueSize()); + jsonGenerator.writeObjectField("serviceEndpointStatistics", storeResponseDiagnostics.getRntbdEndpointStatistics()); + + jsonGenerator.writeEndObject(); + } + + private void writeNonNullObjectField(JsonGenerator jsonGenerator, String fieldName, Object object) throws IOException { + if (object == null) { + return; + } + + jsonGenerator.writeObjectField(fieldName, object); + } + + private void writeNonNullStringField(JsonGenerator jsonGenerator, String fieldName, String value) throws IOException { + if (value == null) { + return; + } + + jsonGenerator.writeStringField(fieldName, value); + } + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHandler.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHandler.java index 38a6c75c60870..6c3f13ddb8eea 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHandler.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHandler.java @@ -76,7 +76,9 @@ public void channelCreated(final Channel channel) { */ @Override public void channelReleased(final Channel channel) { - logger.debug("{} CHANNEL RELEASED", channel); + if (logger.isDebugEnabled()) { + logger.debug("{} CHANNEL RELEASED", channel); + } } /** diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java index 386c63a9c224e..58eb3d8e07bfe 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java @@ -1338,7 +1338,7 @@ private void releaseAndOfferChannel(final Channel channel, final Promise p private void releaseAndOfferChannelIfHealthy( final Channel channel, final Promise promise, - final Future future) { + final Future future) throws Exception { final boolean isHealthy = future.getNow(); @@ -1351,18 +1351,13 @@ private void releaseAndOfferChannelIfHealthy( } } else { // Channel is unhealthy so just close and release it - try { - this.poolHandler.channelReleased(channel); - } catch (Throwable error) { - logger.debug("[{}] pool handler failed due to ", this, error); - } finally { - if (this.executor.inEventLoop()) { - this.closeChannel(channel); - } else { - this.executor.submit(() -> this.closeChannel(channel)); - } - promise.setSuccess(null); + this.poolHandler.channelReleased(channel); + if (this.executor.inEventLoop()) { + this.closeChannel(channel); + } else { + this.executor.submit(() -> this.closeChannel(channel)); } + promise.setSuccess(null); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdMetrics.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdMetrics.java index b68fb1002bd24..67d59f56d6479 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdMetrics.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdMetrics.java @@ -49,8 +49,11 @@ public final class RntbdMetrics { if (step > 0) { RntbdMetrics.add(RntbdMetrics.consoleLoggingRegistry(step)); } - } catch (Throwable error) { - logger.error("failed to initialize console logging registry due to ", error); + } catch (Throwable throwable) { + logger.error("failed to initialize console logging registry due to ", throwable); + if (throwable instanceof Error) { + throw (Error) throwable; + } } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdReporter.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdReporter.java index 7a50e83e64398..8e75a3f3fa13e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdReporter.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdReporter.java @@ -24,6 +24,9 @@ public final class RntbdReporter { value = file.getName(); } catch (Throwable error) { value = "azure-cosmos"; + if (error instanceof Error) { + throw (Error) error; + } } codeSource = value; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManager.java index 3ac07a02e76ad..d094ea47a3e27 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManager.java @@ -303,7 +303,9 @@ public void exceptionCaught(final ChannelHandlerContext context, final Throwable if (!this.closingExceptionally) { this.completeAllPendingRequestsExceptionally(context, cause); - logger.debug("{} closing due to:", context, cause); + if (logger.isDebugEnabled()) { + logger.debug("{} closing due to:", context, cause); + } context.flush().close(); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdResponse.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdResponse.java index 3e4cb0d89f7cd..0b5b64a13706b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdResponse.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdResponse.java @@ -361,7 +361,7 @@ StoreResponse toStoreResponse(final RntbdContext context) { this.content.getBytes(0, content); } - return new StoreResponse(this.getStatus().code(), this.headers.asList(context, this.getActivityId()), content); + return new StoreResponse(this.getStatus().code(), this.headers.asMap(context, this.getActivityId()), content); } // endregion diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextFactory.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextFactory.java index d9ff8bc6935aa..99eb2f5f85fc5 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextFactory.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextFactory.java @@ -4,6 +4,7 @@ import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.implementation.BadRequestException; +import com.azure.cosmos.implementation.Constants; import com.azure.cosmos.implementation.DiagnosticsClientContext; import com.azure.cosmos.implementation.DocumentCollection; import com.azure.cosmos.implementation.ImplementationBridgeHelpers; @@ -217,6 +218,10 @@ private static void tryCacheQueryPlan( Map queryPlanCache) { QueryInfo queryInfo = partitionedQueryExecutionInfo.getQueryInfo(); if (canCacheQuery(queryInfo) && !queryPlanCache.containsKey(query.getQueryText())) { + if (queryPlanCache.size() == Constants.QUERYPLAN_CACHE_SIZE) { + logger.warn("Clearing query plan cache as it has reached the maximum size : {}", queryPlanCache.size()); + queryPlanCache.clear(); + } queryPlanCache.put(query.getQueryText(), partitionedQueryExecutionInfo); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/CosmosPagedFlux.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/CosmosPagedFlux.java index 8081fb6e37dc1..25a266faef106 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/CosmosPagedFlux.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/CosmosPagedFlux.java @@ -436,7 +436,8 @@ private void addDiagnosticsOnTracerEvent(TracerProvider tracerProvider, CosmosDi if (clientSideRequestStatistics.getResponseStatisticsList() != null && clientSideRequestStatistics.getResponseStatisticsList().size() > 0 && clientSideRequestStatistics.getResponseStatisticsList().get(0).getStoreResult() != null) { String eventName = - "Diagnostics for PKRange " + clientSideRequestStatistics.getResponseStatisticsList().get(0).getStoreResult().partitionKeyRangeId; + "Diagnostics for PKRange " + + clientSideRequestStatistics.getResponseStatisticsList().get(0).getStoreResult().getStoreResponseDiagnostics().getPartitionKeyRangeId(); tracerProvider.addEvent(eventName, attributes, OffsetDateTime.ofInstant(clientSideRequestStatistics.getRequestStartTimeUTC(), ZoneOffset.UTC), parentContext); } else if (clientSideRequestStatistics.getGatewayStatistics() != null) { diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java index b9defc04e865f..1ae33ad4dee3c 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java @@ -241,6 +241,9 @@ public void gatewayDiagnosticsOnException() throws Exception { assertThat(diagnostics).contains("\"statusCode\":404"); assertThat(diagnostics).contains("\"operationType\":\"Read\""); assertThat(diagnostics).contains("\"userAgent\":\"" + Utils.getUserAgent() + "\""); + assertThat(diagnostics).contains("\"exceptionMessage\":\"Entity with the specified id does not exist in the system."); + assertThat(diagnostics).contains("\"exceptionResponseHeaders\""); + assertThat(diagnostics).doesNotContain("\"exceptionResponseHeaders\": \"{}\""); assertThat(diagnostics).containsAnyOf( "\"machineId\":\"" + tempMachineId + "\"", // logged machineId should be static uuid or "\"machineId\":\"" + ClientTelemetry.getMachineId(null) + "\"" // the vmId from Azure @@ -316,6 +319,9 @@ public void directDiagnostics() throws Exception { } catch (CosmosException e) { diagnostics = e.getDiagnostics().toString(); assertThat(diagnostics).contains("\"backendLatencyInMs\""); + assertThat(diagnostics).contains("\"exceptionMessage\":\"[\\\"Resource with specified id or name already exists.\\\"]\""); + assertThat(diagnostics).contains("\"exceptionResponseHeaders\""); + assertThat(diagnostics).doesNotContain("\"exceptionResponseHeaders\": \"{}\""); validateTransportRequestTimelineDirect(e.getDiagnostics().toString()); } } finally { @@ -670,6 +676,9 @@ public void directDiagnosticsOnException() throws Exception { assertThat(exception.getDiagnostics().getDuration()).isNotNull(); assertThat(diagnostics).contains("\"backendLatencyInMs\""); assertThat(diagnostics).containsPattern("(?s).*?\"activityId\":\"[^\\s\"]+\".*"); + assertThat(diagnostics).contains("\"exceptionMessage\":\"[\\\"Resource Not Found."); + assertThat(diagnostics).contains("\"exceptionResponseHeaders\""); + assertThat(diagnostics).doesNotContain("\"exceptionResponseHeaders\":null"); isValidJSON(diagnostics); validateTransportRequestTimelineDirect(diagnostics); validateRegionContacted(createResponse.getDiagnostics(), client.asyncClient()); @@ -705,6 +714,7 @@ public void directDiagnosticsOnMetadataException() { String diagnostics = exception.getDiagnostics().toString(); assertThat(exception.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.BADREQUEST); assertThat(diagnostics).contains("\"connectionMode\":\"DIRECT\""); + assertThat(diagnostics).contains("\"exceptionMessage\":\"TestBadRequest\""); assertThat(diagnostics).doesNotContain(("\"resourceAddress\":null")); assertThat(diagnostics).contains("\"resourceType\":\"DocumentCollection\""); assertThat(exception.getDiagnostics().getContactedRegionNames()).isNotEmpty(); @@ -818,6 +828,12 @@ public void rntbdRequestResponseLengthStatistics() throws Exception { fail("expected to fail due to 409"); } catch (CosmosException e) { // no request payload and no response payload + logger.info("Diagnostics are : {}", e.getDiagnostics()); + String diagnostics = e.getDiagnostics().toString(); + assertThat(diagnostics).contains("\"exceptionMessage\":\"[\\\"Resource with specified id or name already exists.\\\"]\""); + assertThat(diagnostics).contains("\"exceptionResponseHeaders\""); + assertThat(diagnostics).doesNotContain("\"exceptionResponseHeaders\": \"{}\""); + validate(e.getDiagnostics(), testItemLength, 0); } @@ -915,8 +931,7 @@ private void validateRntbdStatistics(CosmosDiagnostics cosmosDiagnostics, JsonNode storeResult = responseStatisticsList.get(0).get("storeResult"); assertThat(storeResult).isNotNull(); - boolean hasPayload = storeResult.get("exception").isNull(); - assertThat(storeResult.get("channelTaskQueueSize").asInt(-1)).isGreaterThan(0); + assertThat(storeResult.get("channelTaskQueueSize").asInt(-1)).isGreaterThanOrEqualTo(0); assertThat(storeResult.get("pendingRequestsCount").asInt(-1)).isGreaterThanOrEqualTo(0); JsonNode serviceEndpointStatistics = storeResult.get("serviceEndpointStatistics"); @@ -968,7 +983,7 @@ private void validate(CosmosDiagnostics cosmosDiagnostics, int expectedRequestPa assertThat(responseStatisticsList.size()).isGreaterThan(0); JsonNode storeResult = responseStatisticsList.get(0).get("storeResult"); - boolean hasPayload = storeResult.get("exception").isNull(); + boolean hasPayload = storeResult.get("exceptionMessage") == null; assertThat(storeResult).isNotNull(); assertThat(storeResult.get("rntbdRequestLengthInBytes").asInt(-1)).isGreaterThan(expectedRequestPayloadSize); assertThat(storeResult.get("rntbdRequestLengthInBytes").asInt(-1)).isGreaterThan(expectedRequestPayloadSize); @@ -1005,9 +1020,7 @@ public void addressResolutionStatistics() { assertThat(writeResourceResponse.getDiagnostics().toString()).contains("addressResolutionStatistics"); assertThat(writeResourceResponse.getDiagnostics().toString()).contains("\"inflightRequest\":false"); assertThat(writeResourceResponse.getDiagnostics().toString()).doesNotContain("endTime=\"null\""); - assertThat(writeResourceResponse.getDiagnostics().toString()).contains("\"errorMessage\":null"); - assertThat(writeResourceResponse.getDiagnostics().toString()).doesNotContain("\"errorMessage\":\"io.netty" + - ".channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information"); + assertThat(writeResourceResponse.getDiagnostics().toString()).contains("\"exceptionMessage\":null"); client2 = new CosmosClientBuilder() .endpoint(TestConfigurations.HOST) @@ -1048,8 +1061,7 @@ public void addressResolutionStatistics() { assertThat(readResourceResponse.getDiagnostics().toString()).contains("addressResolutionStatistics"); assertThat(readResourceResponse.getDiagnostics().toString()).contains("\"inflightRequest\":false"); assertThat(readResourceResponse.getDiagnostics().toString()).doesNotContain("endTime=\"null\""); - assertThat(readResourceResponse.getDiagnostics().toString()).contains("\"errorMessage\":null"); - assertThat(readResourceResponse.getDiagnostics().toString()).contains("\"errorMessage\":\"io.netty" + + assertThat(readResourceResponse.getDiagnostics().toString()).contains("\"exceptionMessage\":\"io.netty" + ".channel.AbstractChannel$AnnotatedConnectException: Connection refused"); } catch (Exception ex) { logger.error("Error in test addressResolutionStatistics", ex); @@ -1098,7 +1110,6 @@ private void clearStoreResponseStatistics(ClientSideRequestStatistics requestSta } private void validateTransportRequestTimelineGateway(String diagnostics) { - assertThat(diagnostics).contains("\"eventName\":\"connectionConfigured\""); assertThat(diagnostics).contains("\"eventName\":\"connectionConfigured\""); assertThat(diagnostics).contains("\"eventName\":\"requestSent\""); assertThat(diagnostics).contains("\"eventName\":\"transitTime\""); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosTracerTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosTracerTest.java index 95117dd7d6726..cdad215b410f9 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosTracerTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosTracerTest.java @@ -18,7 +18,6 @@ import com.azure.cosmos.implementation.TestConfigurations; import com.azure.cosmos.implementation.TracerProvider; import com.azure.cosmos.implementation.Utils; -import com.azure.cosmos.implementation.directconnectivity.DirectBridgeInternal; import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils; import com.azure.cosmos.models.CosmosContainerProperties; import com.azure.cosmos.models.CosmosContainerResponse; @@ -573,8 +572,7 @@ private void verifyTracerDiagnostics(TracerProvider tracerProvider, clientSideRequestStatistics.getResponseStatisticsList()) { Iterator eventIterator = null; try { - eventIterator = - DirectBridgeInternal.getRequestTimeline(storeResponseStatistics.getStoreResult().toResponse()).iterator(); + eventIterator = storeResponseStatistics.getStoreResult().getStoreResponseDiagnostics().getRequestTimeline().iterator(); } catch (CosmosException ex) { eventIterator = BridgeInternal.getRequestTimeline(ex).iterator(); } @@ -602,8 +600,7 @@ private void verifyTracerDiagnostics(TracerProvider tracerProvider, ClientSideRequestStatistics.getCappedSupplementalResponseStatisticsList(clientSideRequestStatistics.getSupplementalResponseStatisticsList())) { Iterator eventIterator = null; try { - eventIterator = - DirectBridgeInternal.getRequestTimeline(storeResponseStatistics.getStoreResult().toResponse()).iterator(); + eventIterator = storeResponseStatistics.getStoreResult().getStoreResponseDiagnostics().getRequestTimeline().iterator(); } catch (CosmosException ex) { eventIterator = BridgeInternal.getRequestTimeline(ex).iterator(); } @@ -646,7 +643,8 @@ private void verifyTracerDiagnostics(TracerProvider tracerProvider, , ArgumentMatchers.any(), Mockito.eq(OffsetDateTime.ofInstant(feedResponseDiagnostics.getQueryPlanDiagnosticsContext().getStartTimeUTC(), ZoneOffset.UTC)), ArgumentMatchers.any()); - assertThat(attributesMap.get("Query Plan Statistics").get("JSON")).isEqualTo(OBJECT_MAPPER.writeValueAsString(feedResponseDiagnostics.getQueryPlanDiagnosticsContext())); + assertThat(attributesMap.get("Query Plan Statistics").get("JSON")) + .isEqualTo(OBJECT_MAPPER.writeValueAsString(feedResponseDiagnostics.getQueryPlanDiagnosticsContext())); } counter = 1; @@ -654,12 +652,14 @@ private void verifyTracerDiagnostics(TracerProvider tracerProvider, feedResponseDiagnostics.getClientSideRequestStatisticsList()) { if (clientSideStatistics.getResponseStatisticsList() != null && clientSideStatistics.getResponseStatisticsList().size() > 0 && clientSideStatistics.getResponseStatisticsList().get(0).getStoreResult() != null) { - Mockito.verify(tracerProvider, Mockito.atLeast(1)).addEvent(Mockito.eq("Diagnostics for PKRange " + clientSideStatistics.getResponseStatisticsList().get(0).getStoreResult().partitionKeyRangeId) + Mockito.verify(tracerProvider, Mockito.atLeast(1)).addEvent(Mockito.eq("Diagnostics for PKRange " + + clientSideStatistics.getResponseStatisticsList().get(0).getStoreResult().getStoreResponseDiagnostics().getPartitionKeyRangeId()) , ArgumentMatchers.any(), Mockito.eq(OffsetDateTime.ofInstant(clientSideStatistics.getRequestStartTimeUTC(), ZoneOffset.UTC)), ArgumentMatchers.any()); } else if (clientSideStatistics.getGatewayStatistics() != null) { - Mockito.verify(tracerProvider, Mockito.atLeast(1)).addEvent(Mockito.eq("Diagnostics for PKRange " + clientSideStatistics.getGatewayStatistics().getPartitionKeyRangeId()) + Mockito.verify(tracerProvider, Mockito.atLeast(1)).addEvent(Mockito.eq("Diagnostics for PKRange " + + clientSideStatistics.getGatewayStatistics().getPartitionKeyRangeId()) , ArgumentMatchers.any(), Mockito.eq(OffsetDateTime.ofInstant(clientSideStatistics.getRequestStartTimeUTC(), ZoneOffset.UTC)), ArgumentMatchers.any()); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/RetryContextOnDiagnosticTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/RetryContextOnDiagnosticTest.java index fd208059a70a1..1172440b6004a 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/RetryContextOnDiagnosticTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/RetryContextOnDiagnosticTest.java @@ -61,7 +61,7 @@ import java.net.URISyntaxException; import java.nio.charset.Charset; import java.time.Duration; -import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.UUID; import java.util.concurrent.Callable; @@ -94,7 +94,7 @@ public void backoffRetryUtilityExecuteRetry() throws Exception { addressSelector = Mockito.mock(AddressSelector.class); CosmosException exception = new CosmosException(410, exceptionText); Mockito.when(callbackMethod.call()).thenThrow(exception, exception, exception, exception, exception) - .thenReturn(Mono.just(new StoreResponse(200, new ArrayList<>(), getUTF8BytesOrNull(responseText)))); + .thenReturn(Mono.just(new StoreResponse(200, new HashMap<>(), getUTF8BytesOrNull(responseText)))); Mono monoResponse = BackoffRetryUtility.executeRetry(callbackMethod, retryPolicy); StoreResponse response = validateSuccess(monoResponse); @@ -137,7 +137,7 @@ public void backoffRetryUtilityExecuteAsync() { CosmosException exception = new CosmosException(410, exceptionText); Mono exceptionMono = Mono.error(exception); Mockito.when(parameterizedCallbackMethod.apply(ArgumentMatchers.any())).thenReturn(exceptionMono, exceptionMono, exceptionMono, exceptionMono, exceptionMono) - .thenReturn(Mono.just(new StoreResponse(200, new ArrayList<>(), getUTF8BytesOrNull(responseText)))); + .thenReturn(Mono.just(new StoreResponse(200, new HashMap<>(), getUTF8BytesOrNull(responseText)))); Mono monoResponse = BackoffRetryUtility.executeAsync( parameterizedCallbackMethod, retryPolicy, diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/StoreResponseBuilder.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/StoreResponseBuilder.java index 62b4aa2898f71..4dabdeaf9b6e7 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/StoreResponseBuilder.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/StoreResponseBuilder.java @@ -7,16 +7,14 @@ import com.azure.cosmos.implementation.directconnectivity.WFConstants; import java.math.BigDecimal; -import java.util.AbstractMap; -import java.util.ArrayList; -import java.util.List; +import java.util.HashMap; import java.util.Map; import static com.azure.cosmos.implementation.Utils.getUTF8BytesOrNull; public class StoreResponseBuilder { private int status; - private List> headerEntries; + private Map headers; private String content; public static StoreResponseBuilder create() { @@ -24,16 +22,16 @@ public static StoreResponseBuilder create() { } public StoreResponseBuilder() { - headerEntries = new ArrayList<>(); + headers = new HashMap<>(); } public StoreResponseBuilder withHeader(String key, String value) { - headerEntries.add(new AbstractMap.SimpleEntry<>(key, value)); + headers.put(key, value); return this; } public StoreResponseBuilder withLSN(long lsn) { - headerEntries.add(new AbstractMap.SimpleEntry<>(WFConstants.BackendHeaders.LSN, Long.toString(lsn))); + headers.put(WFConstants.BackendHeaders.LSN, Long.toString(lsn)); return this; } @@ -43,42 +41,42 @@ public StoreResponseBuilder withRequestCharge(BigDecimal requestCharge) { } public StoreResponseBuilder withRequestCharge(double requestCharge) { - headerEntries.add(new AbstractMap.SimpleEntry<>(HttpConstants.HttpHeaders.REQUEST_CHARGE, Double.toString(requestCharge))); + headers.put(HttpConstants.HttpHeaders.REQUEST_CHARGE, Double.toString(requestCharge)); return this; } public StoreResponseBuilder withLocalLSN(long localLsn) { - headerEntries.add(new AbstractMap.SimpleEntry<>(WFConstants.BackendHeaders.LOCAL_LSN, Long.toString(localLsn))); + headers.put(WFConstants.BackendHeaders.LOCAL_LSN, Long.toString(localLsn)); return this; } public StoreResponseBuilder withPartitionKeyRangeId(String partitionKeyRangeId) { - headerEntries.add(new AbstractMap.SimpleEntry<>(WFConstants.BackendHeaders.PARTITION_KEY_RANGE_ID, partitionKeyRangeId)); + headers.put(WFConstants.BackendHeaders.PARTITION_KEY_RANGE_ID, partitionKeyRangeId); return this; } public StoreResponseBuilder withItemLocalLSN(long itemLocalLsn) { - headerEntries.add(new AbstractMap.SimpleEntry<>(WFConstants.BackendHeaders.ITEM_LOCAL_LSN, Long.toString(itemLocalLsn))); + headers.put(WFConstants.BackendHeaders.ITEM_LOCAL_LSN, Long.toString(itemLocalLsn)); return this; } public StoreResponseBuilder withQuorumAckecdLsn(long quorumAckecdLsn) { - headerEntries.add(new AbstractMap.SimpleEntry<>(WFConstants.BackendHeaders.QUORUM_ACKED_LSN, Long.toString(quorumAckecdLsn))); + headers.put(WFConstants.BackendHeaders.QUORUM_ACKED_LSN, Long.toString(quorumAckecdLsn)); return this; } public StoreResponseBuilder withQuorumAckecdLocalLsn(long quorumAckecdLocalLsn) { - headerEntries.add(new AbstractMap.SimpleEntry<>(WFConstants.BackendHeaders.QUORUM_ACKED_LOCAL_LSN, Long.toString(quorumAckecdLocalLsn))); + headers.put(WFConstants.BackendHeaders.QUORUM_ACKED_LOCAL_LSN, Long.toString(quorumAckecdLocalLsn)); return this; } public StoreResponseBuilder withGlobalCommittedLsn(long globalCommittedLsn) { - headerEntries.add(new AbstractMap.SimpleEntry<>(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN, Long.toString(globalCommittedLsn))); + headers.put(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN, Long.toString(globalCommittedLsn)); return this; } public StoreResponseBuilder withSessionToken(String sessionToken) { - headerEntries.add(new AbstractMap.SimpleEntry<>(HttpConstants.HttpHeaders.SESSION_TOKEN, sessionToken)); + headers.put(HttpConstants.HttpHeaders.SESSION_TOKEN, sessionToken); return this; } @@ -93,6 +91,6 @@ public StoreResponseBuilder withContent(String content) { } public StoreResponse build() { - return new StoreResponse(status, headerEntries, getUTF8BytesOrNull(content)); + return new StoreResponse(status, headers, getUTF8BytesOrNull(content)); } } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/batch/CosmosBulkItemResponseTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/batch/CosmosBulkItemResponseTest.java index 9ee18f141e42a..1d3eac53c972e 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/batch/CosmosBulkItemResponseTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/batch/CosmosBulkItemResponseTest.java @@ -76,7 +76,7 @@ public void validateAllSetValuesInCosmosBulkItemResponse() { StoreResponse storeResponse = new StoreResponse( HttpResponseStatus.OK.code(), - new ArrayList<>(headers.entrySet()), + headers, responseContent.getBytes(StandardCharsets.UTF_8)); CosmosBatchResponse batchResponse = BatchResponseParser.fromDocumentServiceResponse( @@ -155,7 +155,7 @@ public void validateEmptyHeaderInCosmosBulkItemResponse() { StoreResponse storeResponse = new StoreResponse( HttpResponseStatus.OK.code(), - new ArrayList<>(), + new HashMap<>(), responseContent.getBytes(StandardCharsets.UTF_8)); CosmosBatchResponse batchResponse = BatchResponseParser.fromDocumentServiceResponse( diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/batch/TransactionalBatchResponseTests.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/batch/TransactionalBatchResponseTests.java index a52ad11e31858..f2467977df117 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/batch/TransactionalBatchResponseTests.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/batch/TransactionalBatchResponseTests.java @@ -72,7 +72,7 @@ public void validateAllSetValuesInResponse() { StoreResponse storeResponse = new StoreResponse( HttpResponseStatus.OK.code(), - new ArrayList<>(headers.entrySet()), + headers, responseContent.getBytes(StandardCharsets.UTF_8)); CosmosBatchResponse batchResponse = BatchResponseParser.fromDocumentServiceResponse( @@ -132,7 +132,7 @@ public void validateEmptyHeaderInResponse() { StoreResponse storeResponse = new StoreResponse( HttpResponseStatus.OK.code(), - new ArrayList<>(), + new HashMap<>(), responseContent.getBytes(StandardCharsets.UTF_8)); CosmosBatchResponse batchResponse = BatchResponseParser.fromDocumentServiceResponse( diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java index 6ca2ff8cb357c..02051ef22c022 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java @@ -17,10 +17,12 @@ import com.azure.cosmos.implementation.RequestTimeoutException; import com.azure.cosmos.implementation.RetryContext; import com.azure.cosmos.implementation.RxDocumentServiceRequest; +import com.azure.cosmos.implementation.SessionTokenHelper; import com.azure.cosmos.implementation.StoreResponseBuilder; import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.guava25.collect.ImmutableList; import io.reactivex.subscribers.TestSubscriber; +import org.mockito.MockedStatic; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.testng.annotations.DataProvider; @@ -28,9 +30,9 @@ import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Mono; -import java.util.AbstractMap; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -122,6 +124,49 @@ public void exception(Exception ex, Class klass, int expectedStatusCo failureValidator.validate(subscriber.errors().get(0)); } + @Test(groups = "unit") + public void writeAsync_Error() { + TransportClientWrapper transportClientWrapper = new TransportClientWrapper.Builder.ReplicaResponseBuilder + .SequentialBuilder() + .then(Mockito.mock(StoreResponse.class)) + .build(); + + Uri primaryUri = Uri.create("primary"); + Uri secondaryUri1 = Uri.create("secondary1"); + Uri secondaryUri2 = Uri.create("secondary2"); + Uri secondaryUri3 = Uri.create("secondary3"); + + AddressSelectorWrapper addressSelectorWrapper = AddressSelectorWrapper.Builder.Simple.create() + .withPrimary(primaryUri) + .withSecondary(ImmutableList.of(secondaryUri1, secondaryUri2, secondaryUri3)) + .build(); + sessionContainer = Mockito.mock(ISessionContainer.class); + IAuthorizationTokenProvider authorizationTokenProvider = Mockito.mock(IAuthorizationTokenProvider.class); + serviceConfigReader = Mockito.mock(GatewayServiceConfigurationReader.class); + MockedStatic sessionTokenHelperMockedStatic = Mockito.mockStatic(SessionTokenHelper.class); + + consistencyWriter = new ConsistencyWriter(clientContext, + addressSelectorWrapper.addressSelector, + sessionContainer, + transportClientWrapper.transportClient, + authorizationTokenProvider, + serviceConfigReader, + false); + + TimeoutHelper timeoutHelper = Mockito.mock(TimeoutHelper.class); + RxDocumentServiceRequest dsr = mockDocumentServiceRequest(clientContext); + String outOfMemoryError = "Custom out of memory error"; + Mockito.doThrow(new OutOfMemoryError(outOfMemoryError)).when(SessionTokenHelper.class); + SessionTokenHelper.setOriginalSessionToken(dsr, null); + + Mono res = consistencyWriter.writeAsync(dsr, timeoutHelper, false); + + FailureValidator validator = FailureValidator.builder().instanceOf(OutOfMemoryError.class).errorMessageContains(outOfMemoryError).build(); + validateError(res, validator); + // Finally, close the mocked static thread + sessionTokenHelperMockedStatic.close(); + } + @Test(groups = "unit") public void startBackgroundAddressRefresh() throws Exception { initializeConsistencyWriter(false); @@ -162,10 +207,9 @@ public void run() { @Test(groups = "unit") public void getLsnAndGlobalCommittedLsn() { - ImmutableList.Builder> builder = new ImmutableList.Builder<>(); - builder.add(new AbstractMap.SimpleEntry<>(WFConstants.BackendHeaders.LSN, "3")); - builder.add(new AbstractMap.SimpleEntry<>(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN, "2")); - ImmutableList> headers = builder.build(); + Map headers = new HashMap<>(); + headers.put(WFConstants.BackendHeaders.LSN, "3"); + headers.put(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN, "2"); StoreResponse sr = new StoreResponse(0, headers, null); Utils.ValueHolder lsn = Utils.ValueHolder.initialize(-2l); @@ -337,6 +381,18 @@ private void initializeConsistencyWriter(boolean useMultipleWriteLocation) { useMultipleWriteLocation); } + public static void validateError(Mono single, + FailureValidator validator) { + TestSubscriber testSubscriber = new TestSubscriber<>(); + + try { + single.flux().subscribe(testSubscriber); + } catch (Throwable throwable) { + assertThat(throwable).isInstanceOf(Error.class); + validator.validate(throwable); + } + } + // TODO: add more mocking unit tests for Global STRONG (mocking unit tests) // TODO: add more tests for SESSION behaviour (mocking unit tests) // TODO: add more tests for error handling behaviour (mocking unit tests) diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/HttpUtilsTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/HttpUtilsTest.java index d79a2bff8f784..209e216391b1b 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/HttpUtilsTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/HttpUtilsTest.java @@ -8,7 +8,7 @@ import org.mockito.Mockito; import org.testng.annotations.Test; -import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.Set; @@ -33,8 +33,8 @@ public void verifyConversionOfHttpResponseHeadersToMap() { assertThat(entry.getKey()).isEqualTo(HttpConstants.HttpHeaders.OWNER_FULL_NAME); assertThat(entry.getValue()).isEqualTo(HttpUtils.urlDecode(OWNER_FULL_NAME_VALUE)); - List> resultHeadersList = HttpUtils.unescape(httpResponseHeaders.toMap().entrySet()); - assertThat(resultHeadersList.size()).isEqualTo(1); + Map resultHeaders = HttpUtils.unescape(httpResponseHeaders.toMap()); + assertThat(resultHeaders.size()).isEqualTo(1); entry = resultHeadersSet.iterator().next(); assertThat(entry.getKey()).isEqualTo(HttpConstants.HttpHeaders.OWNER_FULL_NAME); assertThat(entry.getValue()).isEqualTo(HttpUtils.urlDecode(OWNER_FULL_NAME_VALUE)); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreReaderTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreReaderTest.java index d5d405eb72ca1..7db49a4310d8b 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreReaderTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreReaderTest.java @@ -22,12 +22,14 @@ import com.azure.cosmos.implementation.RequestRateTooLargeException; import com.azure.cosmos.implementation.ResourceType; import com.azure.cosmos.implementation.RxDocumentServiceRequest; +import com.azure.cosmos.implementation.SessionTokenHelper; import com.azure.cosmos.implementation.StoreResponseBuilder; import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.VectorSessionToken; import com.azure.cosmos.implementation.guava25.collect.ImmutableList; import io.reactivex.subscribers.TestSubscriber; import org.assertj.core.api.AssertionsForClassTypes; +import org.mockito.MockedStatic; import org.mockito.Mockito; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -562,6 +564,43 @@ public void readPrimaryAsync_GoneExceptionOnTimeout() { validateException(readResult, validator); } + @Test(groups = "unit") + public void readPrimaryAsync_Error() { + TransportClient transportClient = Mockito.mock(TransportClient.class); + AddressSelector addressSelector = Mockito.mock(AddressSelector.class); + ISessionContainer sessionContainer = Mockito.mock(ISessionContainer.class); + + Uri primaryURI = Uri.create("primaryLoc"); + + RxDocumentServiceRequest request = RxDocumentServiceRequest.createFromName(mockDiagnosticsClientContext(), + OperationType.Read, "/dbs/db/colls/col/docs/docId", ResourceType.Document); + + request.requestContext = Mockito.mock(DocumentServiceRequestContext.class); + request.requestContext.timeoutHelper = Mockito.mock(TimeoutHelper.class); + request.requestContext.resolvedPartitionKeyRange = partitionKeyRangeWithId("12"); + request.requestContext.requestChargeTracker = new RequestChargeTracker(); + + Mockito.doReturn(Mono.just(primaryURI)).when(addressSelector).resolvePrimaryUriAsync( + Mockito.eq(request) , Mockito.eq(false)); + + StoreResponse storeResponse = Mockito.mock(StoreResponse.class); + Mockito.doReturn(Mono.just(storeResponse)).when(transportClient).invokeResourceOperationAsync(Mockito.eq(primaryURI), Mockito.eq(request)); + + StoreReader storeReader = new StoreReader(transportClient, addressSelector, sessionContainer); + + String outOfMemoryError = "Custom out of memory error"; + MockedStatic sessionTokenHelperMockedStatic = Mockito.mockStatic(SessionTokenHelper.class); + Mockito.doThrow(new OutOfMemoryError(outOfMemoryError)).when(SessionTokenHelper.class); + SessionTokenHelper.setOriginalSessionToken(request, null); + + Mono readResult = storeReader.readPrimaryAsync(request, true, true); + + FailureValidator validator = FailureValidator.builder().instanceOf(OutOfMemoryError.class).errorMessageContains(outOfMemoryError).build(); + validateError(readResult, validator); + // Finally, close the mocked static thread + sessionTokenHelperMockedStatic.close(); + } + @Test(groups = "unit") public void canParseLongLsn() { TransportClient transportClient = Mockito.mock(TransportClient.class); @@ -911,6 +950,18 @@ public static void validateException(Mono single, validateException(single, validator, TIMEOUT); } + public static void validateError(Mono single, + FailureValidator validator) { + TestSubscriber testSubscriber = new TestSubscriber<>(); + + try { + single.flux().subscribe(testSubscriber); + } catch (Throwable throwable) { + assertThat(throwable).isInstanceOf(Error.class); + validator.validate(throwable); + } + } + private int getMatchingElementCount(String cosmosDiagnostics, String regex) { Pattern storeResultPattern = Pattern.compile(regex); Matcher matcher = storeResultPattern.matcher(cosmosDiagnostics); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreResponseTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreResponseTest.java index 569abe5d51fc7..219b673dfbc85 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreResponseTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreResponseTest.java @@ -19,7 +19,7 @@ public void stringContent() { headerMap.put("key1", "value1"); headerMap.put("key2", "value2"); - StoreResponse sp = new StoreResponse(200, new ArrayList<>(headerMap.entrySet()), getUTF8BytesOrNull(content)); + StoreResponse sp = new StoreResponse(200, headerMap, getUTF8BytesOrNull(content)); assertThat(sp.getStatus()).isEqualTo(200); assertThat(sp.getResponseBody()).isEqualTo(getUTF8BytesOrNull(content)); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreResponseValidator.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreResponseValidator.java index 1d5aee4f0beee..dba3a36c2d117 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreResponseValidator.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreResponseValidator.java @@ -6,8 +6,8 @@ import org.assertj.core.api.Condition; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; +import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; @@ -38,7 +38,7 @@ public Builder hasHeader(String headerKey) { validators.add(new StoreResponseValidator() { @Override public void validate(StoreResponse resp) { - assertThat(Arrays.asList(resp.getResponseHeaderNames())).asList().contains(headerKey); + assertThat(resp.getResponseHeaders().containsKey(headerKey)).isTrue(); } }); return this; @@ -48,9 +48,9 @@ public Builder withHeader(String headerKey, String headerValue) { validators.add(new StoreResponseValidator() { @Override public void validate(StoreResponse resp) { - assertThat(Arrays.asList(resp.getResponseHeaderNames())).asList().contains(headerKey); - int index = Arrays.asList(resp.getResponseHeaderNames()).indexOf(headerKey); - assertThat(resp.getResponseHeaderValues()[index]).isEqualTo(headerValue); + Map responseHeaders = resp.getResponseHeaders(); + assertThat(responseHeaders.containsKey(headerKey)).isTrue(); + assertThat(responseHeaders.get(headerKey)).isEqualTo(headerValue); } }); return this; @@ -61,10 +61,9 @@ public Builder withHeaderValueCondition(String headerKey, Condition cond validators.add(new StoreResponseValidator() { @Override public void validate(StoreResponse resp) { - assertThat(Arrays.asList(resp.getResponseHeaderNames())).asList().contains(headerKey); - int index = Arrays.asList(resp.getResponseHeaderNames()).indexOf(headerKey); - String value = resp.getResponseHeaderValues()[index]; - condition.matches(value); + Map responseHeaders = resp.getResponseHeaders(); + assertThat(responseHeaders.containsKey(headerKey)).isTrue(); + condition.matches(responseHeaders.get(headerKey)); } }); return this; diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/DocumentCrudTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/DocumentCrudTest.java index c4bbf6f209758..1920f4191b031 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/DocumentCrudTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/DocumentCrudTest.java @@ -88,7 +88,7 @@ public void createDocument_AlreadyExists(String documentId) throws InterruptedEx @Test(groups = { "emulator" }, timeOut = TIMEOUT, dataProvider = "documentCrudArgProvider") public void createDocumentTimeout(String documentId) throws InterruptedException { InternalObjectNode docDefinition = getDocumentDefinition(documentId); - Mono> createObservable = container.createItem(docDefinition, new CosmosItemRequestOptions()).timeout(Duration.ofMillis(1)); + Mono> createObservable = container.createItem(docDefinition, new CosmosItemRequestOptions()).timeout(Duration.ofNanos(10)); FailureValidator validator = new FailureValidator.Builder().instanceOf(TimeoutException.class).build(); validateItemFailure(createObservable, validator); }