Skip to content

JAVA-3149: Support request cancellation in request throttler #1950

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ public void cancel() {

cancelScheduledTasks(null);
cancelGlobalTimeout();
throttler.signalCancel(this);
}

private void cancelGlobalTimeout() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ public class GraphRequestHandler implements Throttled {
try {
if (t instanceof CancellationException) {
cancelScheduledTasks();
context.getRequestThrottler().signalCancel(this);
}
} catch (Throwable t2) {
Loggers.warnWithException(LOG, "[{}] Uncaught exception", logPrefix, t2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,12 @@ public interface RequestThrottler extends Closeable {
* perform time-based eviction on pending requests.
*/
void signalTimeout(@NonNull Throttled request);

/**
* Signals that a request has been cancelled. This indicates to the throttler that another request
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It took me a little bit to come around to why we couldn't just reuse an existing api method (much like the originator of the JIRA re-used signalTimeout), but I think i agree that it makes sense that we add a new API method.

Calling future.cancel() does not actually cancel an inflight query, which I suppose is a similar story to signalTimeout in that C* nodes may still not be working on that query.

I think it's useful to make this a separate method so throttling implementations can make their own decision about how to handle cancel. 👍

* might be started.
*/
default void signalCancel(@NonNull Throttled request) {
// no-op for backward compatibility purposes
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ protected CqlPrepareHandler(
try {
if (t instanceof CancellationException) {
cancelTimeout();
context.getRequestThrottler().signalCancel(this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thought I immediately had was, "is it possible that we might signal to the throttler twice for the same request?".

It doesn't appear to be possible from what I can tell. It looks like CompletableFuture.cancel(..) cannot cancel a completed exception, so this should be fine.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, I think the same.

}
} catch (Throwable t2) {
Loggers.warnWithException(LOG, "[{}] Uncaught exception", logPrefix, t2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ protected CqlRequestHandler(
try {
if (t instanceof CancellationException) {
cancelScheduledTasks();
context.getRequestThrottler().signalCancel(this);
}
} catch (Throwable t2) {
Loggers.warnWithException(LOG, "[{}] Uncaught exception", logPrefix, t2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,22 @@ public void signalTimeout(@NonNull Throttled request) {
}
}

@Override
public void signalCancel(@NonNull Throttled request) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code for signalTimeout/signalCancel is functionally the same except for the log, but it's probably fine this way (complexity of method is low enough that refactoring into a common method is probably not worth it).

lock.lock();
try {
if (!closed) {
if (queue.remove(request)) { // The request has been cancelled before it was active
LOG.trace("[{}] Removing cancelled request from the queue", logPrefix);
} else {
onRequestDone();
}
}
} finally {
lock.unlock();
}
}

@SuppressWarnings("GuardedBy") // this method is only called with the lock held
private void onRequestDone() {
assert lock.isHeldByCurrentThread();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ public void signalTimeout(@NonNull Throttled request) {
// nothing to do
}

@Override
public void signalCancel(@NonNull Throttled request) {
// nothing to do
}

@Override
public void close() throws IOException {
// nothing to do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,18 @@ public void signalTimeout(@NonNull Throttled request) {
}
}

@Override
public void signalCancel(@NonNull Throttled request) {
lock.lock();
try {
if (!closed && queue.remove(request)) { // The request has been cancelled before it was active
LOG.trace("[{}] Removing cancelled request from the queue", logPrefix);
}
} finally {
lock.unlock();
}
}

@Override
public void close() {
lock.lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ public void should_allow_new_request_when_active_one_times_out() {
should_allow_new_request_when_active_one_completes(throttler::signalTimeout);
}

@Test
public void should_allow_new_request_when_active_one_canceled() {
should_allow_new_request_when_active_one_completes(throttler::signalCancel);
}

private void should_allow_new_request_when_active_one_completes(
Consumer<Throttled> completeCallback) {
// Given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverConfig;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.session.throttling.Throttled;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.context.NettyOptions;
import com.datastax.oss.driver.internal.core.util.concurrent.ScheduledTaskCapturingEventLoop;
Expand All @@ -33,6 +34,7 @@
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -164,6 +166,15 @@ public void should_reject_when_queue_is_full() {

@Test
public void should_remove_timed_out_request_from_queue() {
testRemoveInvalidEventFromQueue(throttler::signalTimeout);
}

@Test
public void should_remove_cancel_request_from_queue() {
testRemoveInvalidEventFromQueue(throttler::signalCancel);
}

private void testRemoveInvalidEventFromQueue(Consumer<Throttled> completeCallback) {
// Given
for (int i = 0; i < 5; i++) {
throttler.register(new MockThrottled());
Expand All @@ -174,7 +185,7 @@ public void should_remove_timed_out_request_from_queue() {
throttler.register(queued2);

// When
throttler.signalTimeout(queued1);
completeCallback.accept(queued1);

// Then
assertThatStage(queued2.started).isNotDone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@
import com.datastax.oss.driver.api.core.RequestThrottlingException;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.testinfra.session.SessionUtils;
import com.datastax.oss.driver.api.testinfra.simulacron.SimulacronRule;
import com.datastax.oss.driver.categories.ParallelizableTests;
import com.datastax.oss.driver.internal.core.session.throttling.ConcurrencyLimitingRequestThrottler;
import com.datastax.oss.simulacron.common.cluster.ClusterSpec;
import com.datastax.oss.simulacron.common.stubbing.PrimeDsl;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand All @@ -39,21 +42,20 @@
public class ThrottlingIT {

private static final String QUERY = "select * from foo";
private static final int maxConcurrentRequests = 10;
private static final int maxQueueSize = 10;

@Rule public SimulacronRule simulacron = new SimulacronRule(ClusterSpec.builder().withNodes(1));

@Test
public void should_reject_request_when_throttling_by_concurrency() {
private DriverConfigLoader loader = null;

@Before
public void setUp() {
// Add a delay so that requests don't complete during the test
simulacron
.cluster()
.prime(PrimeDsl.when(QUERY).then(PrimeDsl.noRows()).delay(5, TimeUnit.SECONDS));

int maxConcurrentRequests = 10;
int maxQueueSize = 10;

DriverConfigLoader loader =
loader =
SessionUtils.configLoaderBuilder()
.withClass(
DefaultDriverOption.REQUEST_THROTTLER_CLASS,
Expand All @@ -63,7 +65,10 @@ public void should_reject_request_when_throttling_by_concurrency() {
maxConcurrentRequests)
.withInt(DefaultDriverOption.REQUEST_THROTTLER_MAX_QUEUE_SIZE, maxQueueSize)
.build();
}

@Test
public void should_reject_request_when_throttling_by_concurrency() {
try (CqlSession session = SessionUtils.newSession(simulacron, loader)) {

// Saturate the session and fill the queue
Expand All @@ -81,4 +86,19 @@ public void should_reject_request_when_throttling_by_concurrency() {
+ "(concurrent requests: 10, queue size: 10)");
}
}

@Test
public void should_propagate_cancel_to_throttler() {
try (CqlSession session = SessionUtils.newSession(simulacron, loader)) {

// Try to saturate the session and fill the queue
for (int i = 0; i < maxConcurrentRequests + maxQueueSize; i++) {
CompletionStage<AsyncResultSet> future = session.executeAsync(QUERY);
future.toCompletableFuture().cancel(true);
}

// The next query should be successful, because the previous queries were cancelled
session.execute(QUERY);
}
}
}