Skip to content

Commit

Permalink
Improved diagnostics with new models for StoreResponse, StoreResult a…
Browse files Browse the repository at this point in the history
…nd CosmosException (Azure#28620)

* Improved diagnostics with new models for StoreResponse, StoreResult and CosmosException

* Fixed spot bugs related to storeResult.getException()

* Updated query plan cache to ConcurrentHashMap with fixed size of 1000 to start with

* Added exception response headers and message to direct and gateway errors. Also added code for throwing any java.lang.Error

* Added unit tests for StoreReader and ConsistencyWriter

* Disabled StoreReader unit test for error since it is causing other tests to fail. Will investigate later

* Commented out the broken test

* Reverted StoreReaderTest

* Removed mockito-inline

* Fixed StoreReaderTest static mocking

* Fixed ConsistencyWriterTest static mocking

* Code review comments and changelog addition

* Fixed some test cases
  • Loading branch information
kushagraThapar authored May 17, 2022
1 parent 60cf956 commit 957740d
Show file tree
Hide file tree
Showing 44 changed files with 725 additions and 489 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ protected void performWorkload(BaseSubscriber<Document> baseSubscriber, long i)
} catch (Throwable error) {
concurrencyControlSemaphore.release();
logger.error("subscription failed due to ", error);
if (error instanceof Error) {
throw (Error) error;
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
#### Breaking Changes

#### Bugs Fixed
* Fixed bubbling of Errors in case of any `java.lang.Error` - See [PR 28620](https://github.com/Azure/azure-sdk-for-java/pull/28620)

#### Other Changes
* Added `exceptionMessage` and `exceptionResponseHeaders` to `CosmosDiagnostics` in case of any exceptions - See [PR 28620](https://github.com/Azure/azure-sdk-for-java/pull/28620)
* Improved performance of `query plan` cache by using `ConcurrentHashMap` with a fixed size of 1000 - See [PR 28537](https://github.com/Azure/azure-sdk-for-java/pull/28537)
* Changed 429 (Throttling) retry policy to have an upper bound for the back-off time of 5 seconds - See [PR 28764](https://github.com/Azure/azure-sdk-for-java/pull/28764)

### 4.29.1 (2022-04-27)
Expand Down
6 changes: 6 additions & 0 deletions sdk/cosmos/azure-cosmos/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ Licensed under the MIT License.
<version>4.0.0</version> <!-- {x-version-update;org.mockito:mockito-core;external_dependency} -->
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>4.0.0</version> <!-- {x-version-update;org.mockito:mockito-inline;external_dependency} -->
<scope>test</scope>
</dependency>

</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import com.azure.cosmos.implementation.TracerProvider;
import com.azure.cosmos.implementation.Warning;
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import com.azure.cosmos.implementation.directconnectivity.StoreResponseDiagnostics;
import com.azure.cosmos.implementation.directconnectivity.StoreResult;
import com.azure.cosmos.implementation.directconnectivity.StoreResultDiagnostics;
import com.azure.cosmos.implementation.directconnectivity.Uri;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdChannelAcquisitionTimeline;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpointStatistics;
Expand Down Expand Up @@ -561,18 +563,13 @@ public static ClientSideRequestStatistics getClientSideRequestStatics(CosmosDiag
return clientSideRequestStatistics;
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static void setGatewayRequestTimelineOnDiagnostics(CosmosDiagnostics cosmosDiagnostics,
RequestTimeline requestTimeline) {
cosmosDiagnostics.clientSideRequestStatistics().setGatewayRequestTimeline(requestTimeline);
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static void recordResponse(CosmosDiagnostics cosmosDiagnostics,
RxDocumentServiceRequest request,
StoreResult storeResult,
GlobalEndpointManager globalEndpointManager) {
cosmosDiagnostics.clientSideRequestStatistics().recordResponse(request, storeResult, globalEndpointManager);
StoreResultDiagnostics storeResultDiagnostics = StoreResultDiagnostics.createStoreResultDiagnostics(storeResult);
cosmosDiagnostics.clientSideRequestStatistics().recordResponse(request, storeResultDiagnostics, globalEndpointManager);
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
Expand Down Expand Up @@ -602,9 +599,19 @@ public static SerializationDiagnosticsContext getSerializationDiagnosticsContext
public static void recordGatewayResponse(CosmosDiagnostics cosmosDiagnostics,
RxDocumentServiceRequest rxDocumentServiceRequest,
StoreResponse storeResponse,
CosmosException exception,
GlobalEndpointManager globalEndpointManager) {
cosmosDiagnostics.clientSideRequestStatistics().recordGatewayResponse(rxDocumentServiceRequest, storeResponse, exception, globalEndpointManager);
StoreResponseDiagnostics storeResponseDiagnostics = StoreResponseDiagnostics.createStoreResponseDiagnostics(storeResponse);
cosmosDiagnostics.clientSideRequestStatistics().recordGatewayResponse(rxDocumentServiceRequest, storeResponseDiagnostics, globalEndpointManager);
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static void recordGatewayResponse(CosmosDiagnostics cosmosDiagnostics,
RxDocumentServiceRequest rxDocumentServiceRequest,
CosmosException cosmosException,
GlobalEndpointManager globalEndpointManager) {
StoreResponseDiagnostics storeResponseDiagnostics = StoreResponseDiagnostics.createStoreResponseDiagnostics(cosmosException);
cosmosDiagnostics.clientSideRequestStatistics().recordGatewayResponse(rxDocumentServiceRequest, storeResponseDiagnostics, globalEndpointManager);
cosmosException.setDiagnostics(cosmosDiagnostics);
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ protected CosmosException(int statusCode, String message, Map<String, String> re
this.statusCode = statusCode;
this.responseHeaders = new ConcurrentHashMap<>();

// Since ConcurrentHashMap only takes non-null entries, so filtering them before putting them in.
if (responseHeaders != null) {
for (Map.Entry<String, String> entry: responseHeaders.entrySet()) {
if (entry.getKey() != null && entry.getValue() != null) {
Expand Down Expand Up @@ -553,31 +554,6 @@ void setRntbdPendingRequestQueueSize(int rntbdPendingRequestQueueSize) {
public CosmosException createCosmosException(int statusCode, Exception innerException) {
return new CosmosException(statusCode, innerException);
}

@Override
public CosmosException createSerializableCosmosException(CosmosException cosmosException) {
if (cosmosException == null) {
return null;
}
CosmosException exception = new CosmosException(cosmosException.statusCode,
cosmosException.cosmosError, cosmosException.getResponseHeaders());
exception.requestTimeline = cosmosException.requestTimeline;
exception.channelAcquisitionTimeline = cosmosException.channelAcquisitionTimeline;
exception.rntbdChannelTaskQueueSize = cosmosException.rntbdChannelTaskQueueSize;
exception.rntbdEndpointStatistics = cosmosException.rntbdEndpointStatistics;
exception.lsn = cosmosException.lsn;
exception.partitionKeyRangeId = cosmosException.partitionKeyRangeId;
exception.requestUri = cosmosException.requestUri;
exception.resourceAddress = cosmosException.resourceAddress;
exception.requestPayloadLength = cosmosException.requestPayloadLength;
exception.rntbdPendingRequestQueueSize = cosmosException.rntbdPendingRequestQueueSize;
exception.rntbdRequestLength = cosmosException.rntbdRequestLength;
exception.rntbdResponseLength = cosmosException.rntbdResponseLength;
exception.sendingRequestHasStarted = cosmosException.sendingRequestHasStarted;
exception.requestHeaders = null;
exception.cosmosDiagnostics = null;
return exception;
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@
// Licensed under the MIT License.
package com.azure.cosmos.implementation;

import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.cpu.CpuMemoryMonitor;
import com.azure.cosmos.implementation.directconnectivity.DirectBridgeInternal;
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import com.azure.cosmos.implementation.directconnectivity.StoreResult;
import com.azure.cosmos.implementation.directconnectivity.StoreResponseDiagnostics;
import com.azure.cosmos.implementation.directconnectivity.StoreResultDiagnostics;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
Expand Down Expand Up @@ -43,7 +41,6 @@ public class ClientSideRequestStatistics {
private Set<URI> locationEndpointsContacted;
private RetryContext retryContext;
private GatewayStatistics gatewayStatistics;
private RequestTimeline gatewayRequestTimeline;
private MetadataDiagnosticsContext metadataDiagnosticsContext;
private SerializationDiagnosticsContext serializationDiagnosticsContext;

Expand Down Expand Up @@ -93,13 +90,13 @@ public DiagnosticsClientContext.DiagnosticsClientConfig getDiagnosticsClientConf
return diagnosticsClientConfig;
}

public void recordResponse(RxDocumentServiceRequest request, StoreResult storeResult, GlobalEndpointManager globalEndpointManager) {
public void recordResponse(RxDocumentServiceRequest request, StoreResultDiagnostics storeResultDiagnostics, GlobalEndpointManager globalEndpointManager) {
Objects.requireNonNull(request, "request is required and cannot be null.");
Instant responseTime = Instant.now();

StoreResponseStatistics storeResponseStatistics = new StoreResponseStatistics();
storeResponseStatistics.requestResponseTimeUTC = responseTime;
storeResponseStatistics.storeResult = StoreResult.createSerializableStoreResult(storeResult);
storeResponseStatistics.storeResult = storeResultDiagnostics;
storeResponseStatistics.requestOperationType = request.getOperationType();
storeResponseStatistics.requestResourceType = request.getResourceType();
activityId = request.getActivityId().toString();
Expand All @@ -117,8 +114,7 @@ public void recordResponse(RxDocumentServiceRequest request, StoreResult storeRe
this.requestEndTimeUTC = responseTime;
}

// TODO (kuthapar): globalEndpointManager != null check is just for safety for hotfix. Remove it after further investigation
if (locationEndPoint != null && globalEndpointManager != null) {
if (locationEndPoint != null) {
this.regionsContacted.add(globalEndpointManager.getRegionName(locationEndPoint, request.getOperationType()));
this.locationEndpointsContacted.add(locationEndPoint);
}
Expand All @@ -133,8 +129,7 @@ public void recordResponse(RxDocumentServiceRequest request, StoreResult storeRe
}

public void recordGatewayResponse(
RxDocumentServiceRequest rxDocumentServiceRequest, StoreResponse storeResponse,
CosmosException exception, GlobalEndpointManager globalEndpointManager) {
RxDocumentServiceRequest rxDocumentServiceRequest, StoreResponseDiagnostics storeResponseDiagnostics, GlobalEndpointManager globalEndpointManager) {
Instant responseTime = Instant.now();

synchronized (this) {
Expand All @@ -148,8 +143,7 @@ public void recordGatewayResponse(
}
this.recordRetryContextEndTime();

// TODO (kuthapar): globalEndpointManager != null check is just for safety for hotfix. Remove it after further investigation
if (locationEndPoint != null && globalEndpointManager != null) {
if (locationEndPoint != null) {
this.regionsContacted.add(globalEndpointManager.getRegionName(locationEndPoint, rxDocumentServiceRequest.getOperationType()));
this.locationEndpointsContacted.add(locationEndPoint);
}
Expand All @@ -159,34 +153,18 @@ public void recordGatewayResponse(
this.gatewayStatistics.operationType = rxDocumentServiceRequest.getOperationType();
this.gatewayStatistics.resourceType = rxDocumentServiceRequest.getResourceType();
}
if (storeResponse != null) {
this.gatewayStatistics.statusCode = storeResponse.getStatus();
this.gatewayStatistics.subStatusCode = DirectBridgeInternal.getSubStatusCode(storeResponse);
this.gatewayStatistics.sessionToken = storeResponse
.getHeaderValue(HttpConstants.HttpHeaders.SESSION_TOKEN);
this.gatewayStatistics.requestCharge = storeResponse
.getHeaderValue(HttpConstants.HttpHeaders.REQUEST_CHARGE);
this.gatewayStatistics.requestTimeline = DirectBridgeInternal.getRequestTimeline(storeResponse);
this.gatewayStatistics.partitionKeyRangeId = storeResponse.getPartitionKeyRangeId();
this.activityId= storeResponse.getHeaderValue(HttpConstants.HttpHeaders.ACTIVITY_ID);
} else if (exception != null) {
this.gatewayStatistics.statusCode = exception.getStatusCode();
this.gatewayStatistics.subStatusCode = exception.getSubStatusCode();
this.gatewayStatistics.requestTimeline = this.gatewayRequestTimeline;
this.gatewayStatistics.requestCharge= String.valueOf(exception.getRequestCharge());
this.activityId=exception.getActivityId();
}
this.gatewayStatistics.statusCode = storeResponseDiagnostics.getStatusCode();
this.gatewayStatistics.subStatusCode = storeResponseDiagnostics.getSubStatusCode();
this.gatewayStatistics.sessionToken = storeResponseDiagnostics.getSessionTokenAsString();
this.gatewayStatistics.requestCharge = storeResponseDiagnostics.getRequestCharge();
this.gatewayStatistics.requestTimeline = storeResponseDiagnostics.getRequestTimeline();
this.gatewayStatistics.partitionKeyRangeId = storeResponseDiagnostics.getPartitionKeyRangeId();
this.gatewayStatistics.exceptionMessage = storeResponseDiagnostics.getExceptionMessage();
this.gatewayStatistics.exceptionResponseHeaders = storeResponseDiagnostics.getExceptionResponseHeaders();
this.activityId = storeResponseDiagnostics.getActivityId();
}
}

public void setGatewayRequestTimeline(RequestTimeline transportRequestTimeline) {
this.gatewayRequestTimeline = transportRequestTimeline;
}

public RequestTimeline getGatewayRequestTimeline() {
return this.gatewayRequestTimeline;
}

public String recordAddressResolutionStart(
URI targetEndpoint,
boolean forceRefresh,
Expand All @@ -209,7 +187,7 @@ public String recordAddressResolutionStart(
return identifier;
}

public void recordAddressResolutionEnd(String identifier, String errorMessage) {
public void recordAddressResolutionEnd(String identifier, String exceptionMessage) {
if (StringUtils.isEmpty(identifier)) {
return;
}
Expand All @@ -227,7 +205,7 @@ public void recordAddressResolutionEnd(String identifier, String errorMessage) {

AddressResolutionStatistics resolutionStatistics = this.addressResolutionStatistics.get(identifier);
resolutionStatistics.endTimeUTC = responseTime;
resolutionStatistics.errorMessage = errorMessage;
resolutionStatistics.exceptionMessage = exceptionMessage;
resolutionStatistics.inflightRequest = false;
}
}
Expand Down Expand Up @@ -297,16 +275,16 @@ public GatewayStatistics getGatewayStatistics() {
}

public static class StoreResponseStatistics {
@JsonSerialize(using = StoreResult.StoreResultSerializer.class)
private StoreResult storeResult;
@JsonSerialize(using = StoreResultDiagnostics.StoreResultDiagnosticsSerializer.class)
private StoreResultDiagnostics storeResult;
@JsonSerialize(using = DiagnosticsInstantSerializer.class)
private Instant requestResponseTimeUTC;
@JsonSerialize
private ResourceType requestResourceType;
@JsonSerialize
private OperationType requestOperationType;

public StoreResult getStoreResult() {
public StoreResultDiagnostics getStoreResult() {
return storeResult;
}

Expand Down Expand Up @@ -409,7 +387,7 @@ public static class AddressResolutionStatistics {
@JsonSerialize
private String targetEndpoint;
@JsonSerialize
private String errorMessage;
private String exceptionMessage;
@JsonSerialize
private boolean forceRefresh;
@JsonSerialize
Expand All @@ -433,8 +411,8 @@ public String getTargetEndpoint() {
return targetEndpoint;
}

public String getErrorMessage() {
return errorMessage;
public String getExceptionMessage() {
return exceptionMessage;
}

public boolean isInflightRequest() {
Expand All @@ -456,9 +434,11 @@ public static class GatewayStatistics {
private ResourceType resourceType;
private int statusCode;
private int subStatusCode;
private String requestCharge;
private double requestCharge;
private RequestTimeline requestTimeline;
private String partitionKeyRangeId;
private String exceptionMessage;
private String exceptionResponseHeaders;

public String getSessionToken() {
return sessionToken;
Expand All @@ -476,7 +456,7 @@ public int getSubStatusCode() {
return subStatusCode;
}

public String getRequestCharge() {
public double getRequestCharge() {
return requestCharge;
}

Expand All @@ -491,6 +471,14 @@ public ResourceType getResourceType() {
public String getPartitionKeyRangeId() {
return partitionKeyRangeId;
}

public String getExceptionMessage() {
return exceptionMessage;
}

public String getExceptionResponseHeaders() {
return exceptionResponseHeaders;
}
}

public static SystemInformation fetchSystemInformation() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ public static boolean isSubStatusCode(CosmosException e, int subStatus) {
return subStatus == e.getSubStatusCode();
}

public static boolean isGone(CosmosException e) {
return isStatusCode(e, HttpConstants.StatusCodes.GONE);
}

public static boolean isNotFound(CosmosException e) {
return isStatusCode(e, HttpConstants.StatusCodes.NOTFOUND);
}

public static boolean isPartitionSplit(CosmosException e) {
return isStatusCode(e, HttpConstants.StatusCodes.GONE)
&& isSubStatusCode(e, HttpConstants.SubStatusCodes.PARTITION_KEY_RANGE_GONE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,6 @@ public static void setCosmosExceptionAccessor(final CosmosExceptionAccessor newA

public interface CosmosExceptionAccessor {
CosmosException createCosmosException(int statusCode, Exception innerException);
CosmosException createSerializableCosmosException(CosmosException cosmosException);
}
}

Expand Down
Loading

0 comments on commit 957740d

Please sign in to comment.