Skip to content

Commit

Permalink
test: deflake flow control test (#2259)
Browse files Browse the repository at this point in the history
Change-Id: I222078817739f8190faefffe405bd01af9c96df9

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/java-bigtable/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)
- [ ] Rollback plan is reviewed and LGTMed
- [ ] All new data plane features have a completed end to end testing plan

Fixes #<issue_number_goes_here> ☕️

If you write sample code, please follow the [samples format](
https://togithub.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md).
  • Loading branch information
igorbernstein2 authored Jun 12, 2024
1 parent eea4eb0 commit 7247c32
Showing 1 changed file with 13 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -264,6 +264,10 @@ public ApiFuture<List<MutateRowsResponse>> futureCall(
try {
Thread.sleep(Integer.valueOf(latencyHeader.get(0)));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return ApiFutures.immediateFailedFuture(
new IllegalStateException(
"Interrupted while sleeping as requested: " + latencyHeader, e));
}
if (Integer.valueOf(latencyHeader.get(0)) == DEADLINE_EXCEEDED_LATENCY) {
return ApiFutures.immediateFailedFuture(
Expand All @@ -277,32 +281,30 @@ public ApiFuture<List<MutateRowsResponse>> futureCall(

private void createFlowControlEvent(final FlowController flowController) throws Exception {
flowController.reserve(INITIAL_ELEMENT, 0);
final AtomicBoolean threadStarted = new AtomicBoolean(false);
CompletableFuture<Void> threadStarted = new CompletableFuture<>();
CompletableFuture<Void> threadReservedOne = new CompletableFuture<>();
Thread t =
new Thread(
new Runnable() {
@Override
public void run() {
threadStarted.complete(null);
try {
threadStarted.set(true);
flowController.reserve(1, 0);
threadReservedOne.complete(null);
} catch (Exception e) {
threadReservedOne.completeExceptionally(e);
}
}
});
t.start();
// Wait 5 seconds for the thread to start, and 50 milliseconds after it's started to make sure
// Wait 50 milliseconds after the thread has started to make sure
// flowController.reserve(1, 0) is blocked and creates a throttling event. It should never take
// so long.
for (int i = 0; i < 1000; i++) {
if (threadStarted.get()) {
break;
}
Thread.sleep(5);
}
threadStarted.get();
Thread.sleep(50);
flowController.release(INITIAL_ELEMENT, 0);
t.join();
threadReservedOne.get();
flowController.release(1, 0);

assertThat(flowController.getFlowControlEventStats().getLastFlowControlEvent()).isNotNull();
Expand Down

0 comments on commit 7247c32

Please sign in to comment.