Skip to content

Commit

Permalink
ThroughputControl- Discard response out of cycle (Azure#21369)
Browse files Browse the repository at this point in the history
* No ru tracking if response come back out of cycle

Co-authored-by: annie-mac <annie-mac@annie-macs-MacBook-Pro.local>
  • Loading branch information
xinlian12 and annie-mac authored May 14, 2021
1 parent 03a7b5d commit 8fac2bb
Show file tree
Hide file tree
Showing 9 changed files with 254 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class DocumentServiceRequestContext implements Cloneable {
public volatile PartitionKeyInternal effectivePartitionKey;
public volatile CosmosDiagnostics cosmosDiagnostics;
public volatile String resourcePhysicalAddress;
public volatile String throughputControlCycleId;

public DocumentServiceRequestContext() {
}
Expand Down Expand Up @@ -99,7 +100,7 @@ public DocumentServiceRequestContext clone() {
context.performedBackgroundAddressRefresh = this.performedBackgroundAddressRefresh;
context.cosmosDiagnostics = this.cosmosDiagnostics;
context.resourcePhysicalAddress = this.resourcePhysicalAddress;

context.throughputControlCycleId = this.throughputControlCycleId;
return context;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation.throughputControl;

import com.azure.cosmos.implementation.OperationType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

public class ThroughputControlTrackingUnit {

private static final Logger logger = LoggerFactory.getLogger(ThroughputControlTrackingUnit.class);

private final OperationType operationType;
private final AtomicInteger rejectedRequests;
private final AtomicInteger passedRequests;
private final AtomicReference<Double> successRuUsage;
private final AtomicInteger successResponse;
private final AtomicInteger failedResponse;
private final AtomicInteger outOfCycleResponse;
private String throughputControlCycleId;

public ThroughputControlTrackingUnit(OperationType operationType, String throughputControlCycleId) {
this.operationType = operationType;

this.rejectedRequests = new AtomicInteger(0);
this.passedRequests = new AtomicInteger(0);
this.successRuUsage = new AtomicReference<>(0d);
this.successResponse = new AtomicInteger(0);
this.failedResponse = new AtomicInteger(0);
this.outOfCycleResponse = new AtomicInteger(0);
this.throughputControlCycleId = throughputControlCycleId;
}

public void reset(String newCycleId) {
if (this.rejectedRequests.get() > 0
|| this.passedRequests.get() > 0
|| this.successResponse.get() > 0
|| this.failedResponse.get() > 0) {

double sAvgRuPerRequest = 0.0;
if (this.successResponse.get() != 0) {
sAvgRuPerRequest = successRuUsage.get() / this.successResponse.get();
}

logger.debug(
"[CycleId: {}, operationType: {}, rejectedCnt: {}, passedCnt: {}, sAvgRu: {}, successCnt: {}, failedCnt: {}, outOfCycleCnt: {}]",
this.throughputControlCycleId,
this.operationType.toString(),
this.rejectedRequests.get(),
this.passedRequests.get(),
sAvgRuPerRequest,
this.successResponse.get(),
this.failedResponse.get(),
this.outOfCycleResponse.get());
}

this.rejectedRequests.set(0);
this.passedRequests.set(0);
this.successRuUsage.set(0d);
this.successResponse.set(0);
this.failedResponse.set(0);
this.outOfCycleResponse.set(0);
this.throughputControlCycleId = newCycleId;
}

public void increasePassedRequest(){
this.passedRequests.incrementAndGet();
}

public void increaseRejectedRequest(){
this.rejectedRequests.incrementAndGet();
}

public void increaseSuccessResponse() {
this.successResponse.incrementAndGet();
}

public void increaseFailedResponse() { this.failedResponse.incrementAndGet(); }

public void increaseOutOfCycleResponse() { this.outOfCycleResponse.incrementAndGet(); }

public void trackRRuUsage(double ruUsage) {
this.successRuUsage.getAndAccumulate(ruUsage, (available, newRuUsage) -> available + newRuUsage);
}

public int getRejectedRequests() {
return rejectedRequests.get();
}

public int getPassedRequests() {
return passedRequests.get();
}

public double getSuccessRuUsage() {
return successRuUsage.get();
}

public int getSuccessResponse() {
return successResponse.get();
}

public int getFailedResponse() {
return failedResponse.get();
}

public int getOutOfCycleResponse() {
return outOfCycleResponse.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,20 @@
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.RequestRateTooLargeException;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.RxDocumentServiceResponse;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;

import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand All @@ -29,13 +33,20 @@ public class ThroughputRequestThrottler {
private final AtomicReference<Double> scheduledThroughput;
private final ReentrantReadWriteLock.WriteLock throughputWriteLock;
private final ReentrantReadWriteLock.ReadLock throughputReadLock;
private final ConcurrentHashMap<OperationType, ThroughputControlTrackingUnit> trackingDictionary;
private final String pkRangeId;
private String cycleId;

public ThroughputRequestThrottler(double scheduledThroughput) {
public ThroughputRequestThrottler(double scheduledThroughput, String pkRangeId) {
this.availableThroughput = new AtomicReference<>(scheduledThroughput);
this.scheduledThroughput = new AtomicReference<>(scheduledThroughput);
ReentrantReadWriteLock throughputReadWriteLock = new ReentrantReadWriteLock();
this.throughputWriteLock = throughputReadWriteLock.writeLock();
this.throughputReadLock = throughputReadWriteLock.readLock();

this.trackingDictionary = new ConcurrentHashMap<>();
this.cycleId = UUID.randomUUID().toString();
this.pkRangeId = pkRangeId;
}

public double renewThroughputUsageCycle(double scheduledThroughput) {
Expand All @@ -45,6 +56,17 @@ public double renewThroughputUsageCycle(double scheduledThroughput) {
this.scheduledThroughput.set(scheduledThroughput);
this.updateAvailableThroughput();

if (throughputUsagePercentage > 0) {
logger.debug(
"[CycleId: {}, pkRangeId: {}, ruUsagePercentage: {}]",
this.cycleId, this.pkRangeId, throughputUsagePercentage);
}

String newCycleId = UUID.randomUUID().toString();
for (ThroughputControlTrackingUnit trackingUnit : this.trackingDictionary.values()) {
trackingUnit.reset(newCycleId);
}
this.cycleId = newCycleId;
return throughputUsagePercentage;
} finally {
this.throughputWriteLock.unlock();
Expand All @@ -60,15 +82,30 @@ private void updateAvailableThroughput() {
public <T> Mono<T> processRequest(RxDocumentServiceRequest request, Mono<T> originalRequestMono) {
try {
this.throughputReadLock.lock();
ThroughputControlTrackingUnit trackingUnit =
this.trackingDictionary.compute(request.getOperationType(), ((key, value) -> {
if (value == null) {
value = new ThroughputControlTrackingUnit(request.getOperationType(), this.cycleId);
}
return value;
}));

if (this.availableThroughput.get() > 0) {
if (StringUtils.isEmpty(request.requestContext.throughputControlCycleId)) {
request.requestContext.throughputControlCycleId = this.cycleId;
}

trackingUnit.increasePassedRequest();
return originalRequestMono
.doOnSuccess(response -> this.trackRequestCharge(response))
.doOnError(throwable -> this.trackRequestCharge(throwable));
.doOnSuccess(response -> this.trackRequestCharge(request, response))
.doOnError(throwable -> this.trackRequestCharge(request, throwable));
} else {
trackingUnit.increaseRejectedRequest();

// there is no enough throughput left, block request
RequestRateTooLargeException requestRateTooLargeException = new RequestRateTooLargeException();

int backoffTimeInMilliSeconds = (int)Math.floor(Math.abs(this.availableThroughput.get() * 1000 / this.scheduledThroughput.get()));
int backoffTimeInMilliSeconds = (int)Math.ceil(Math.abs(this.availableThroughput.get() / this.scheduledThroughput.get())) * 1000;

requestRateTooLargeException.getResponseHeaders().put(
HttpConstants.HttpHeaders.RETRY_AFTER_IN_MILLISECONDS,
Expand All @@ -87,14 +124,14 @@ public <T> Mono<T> processRequest(RxDocumentServiceRequest request, Mono<T> orig
} finally {
this.throughputReadLock.unlock();
}

}

private <T> void trackRequestCharge (T response) {
private <T> void trackRequestCharge (RxDocumentServiceRequest request, T response) {
try {
// Read lock is enough here.
this.throughputReadLock.lock();
double requestCharge = 0;
boolean failedRequest = false;
if (response instanceof StoreResponse) {
requestCharge = ((StoreResponse)response).getRequestCharge();
} else if (response instanceof RxDocumentServiceResponse) {
Expand All @@ -103,13 +140,31 @@ private <T> void trackRequestCharge (T response) {
CosmosException cosmosException = Utils.as(Exceptions.unwrap((Throwable) response), CosmosException.class);
if (cosmosException != null) {
requestCharge = cosmosException.getRequestCharge();
failedRequest = true;
}
}

ThroughputControlTrackingUnit trackingUnit = trackingDictionary.get(request.getOperationType());
if (trackingUnit != null) {
if (failedRequest) {
trackingUnit.increaseFailedResponse();
} else {
trackingUnit.increaseSuccessResponse();
trackingUnit.trackRRuUsage(requestCharge);
}
}

// If the response comes back in a different cycle, discard it.
if (StringUtils.equals(this.cycleId, request.requestContext.throughputControlCycleId)) {
this.availableThroughput.getAndAccumulate(requestCharge, (available, consumed) -> available - consumed);
} else {
if (trackingUnit != null) {
trackingUnit.increaseOutOfCycleResponse();
}
}
this.availableThroughput.getAndAccumulate(requestCharge, (available, consumed) -> available - consumed);
} finally {
this.throughputReadLock.unlock();
}

}

public double getAvailableThroughput() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.azure.cosmos.implementation.throughputControl.controller.request;

import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.throughputControl.ThroughputRequestThrottler;
import reactor.core.publisher.Mono;

Expand All @@ -15,7 +16,7 @@ public class GlobalThroughputRequestController implements IThroughputRequestCont

public GlobalThroughputRequestController(double initialScheduledThroughput) {
this.scheduledThroughput = new AtomicReference<>(initialScheduledThroughput);
this.requestThrottler = new ThroughputRequestThrottler(this.scheduledThroughput.get());
this.requestThrottler = new ThroughputRequestThrottler(this.scheduledThroughput.get(), StringUtils.EMPTY);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ private void createRequestThrottlers() {
for (PartitionKeyRange pkRange : pkRanges) {
requestThrottlerMap.compute(pkRange.getId(), (pkRangeId, requestThrottler) -> {
if (requestThrottler == null) {
requestThrottler = new ThroughputRequestThrottler(throughputPerPkRange);
requestThrottler = new ThroughputRequestThrottler(throughputPerPkRange, pkRangeId);
}

return requestThrottler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.azure.cosmos.implementation.ConnectionPolicy;
import com.azure.cosmos.implementation.DocumentCollection;
import com.azure.cosmos.implementation.GlobalEndpointManager;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.RetryContext;
import com.azure.cosmos.implementation.RxDocumentClientImpl;
import com.azure.cosmos.implementation.RxStoreModel;
Expand All @@ -28,6 +29,8 @@
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpoint;
import com.azure.cosmos.implementation.http.HttpClient;
import com.azure.cosmos.implementation.routing.CollectionRoutingMap;
import com.azure.cosmos.implementation.throughputControl.ThroughputControlTests;
import com.azure.cosmos.implementation.throughputControl.ThroughputControlTrackingUnit;
import com.azure.cosmos.implementation.throughputControl.ThroughputRequestThrottler;
import com.azure.cosmos.implementation.throughputControl.controller.request.GlobalThroughputRequestController;
import com.azure.cosmos.implementation.throughputControl.controller.request.PkRangesThroughputRequestController;
Expand Down Expand Up @@ -299,4 +302,10 @@ public static AsyncCache<String, CollectionRoutingMap> getRoutingMapAsyncCache(R
public static AtomicBoolean isInitialized(CosmosAsyncContainer cosmosAsyncContainer) {
return get(AtomicBoolean.class, cosmosAsyncContainer, "isInitialized");
}

@SuppressWarnings("unchecked")
public static ConcurrentHashMap<OperationType, ThroughputControlTrackingUnit> getThroughputControlTrackingDictionary(
ThroughputRequestThrottler requestThrottler) {
return get(ConcurrentHashMap.class, requestThrottler, "trackingDictionary");
}
}
Loading

0 comments on commit 8fac2bb

Please sign in to comment.