Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
YifeiZhuang committed Jul 18, 2023
1 parent 0f5f07f commit 099c624
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 3 deletions.
2 changes: 1 addition & 1 deletion core/src/main/java/io/grpc/internal/RetriableStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ private void commitAndRun(Substream winningSubstream) {
Runnable postCommitTask = commit(winningSubstream);

if (postCommitTask != null) {
postCommitTask.run();
callExecutor.execute(postCommitTask);
}
}

Expand Down
125 changes: 123 additions & 2 deletions core/src/test/java/io/grpc/internal/RetriableStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,12 @@
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import org.junit.After;
import org.junit.Test;
Expand Down Expand Up @@ -190,7 +192,7 @@ Status prestart() {

private RetriableStream<String> retriableStream =
newThrottledRetriableStream(null /* throttle */);
private final RetriableStream<String> hedgingStream =
private RetriableStream<String> hedgingStream =
newThrottledHedgingStream(null /* throttle */);

private ClientStreamTracer bufferSizeTracer;
Expand All @@ -206,9 +208,13 @@ method, new Metadata(), channelBufferUsed, PER_RPC_BUFFER_LIMIT, CHANNEL_BUFFER_
}

private RetriableStream<String> newThrottledHedgingStream(Throttle throttle) {
return newThrottledHedgingStream(throttle, MoreExecutors.directExecutor());
}

private RetriableStream<String> newThrottledHedgingStream(Throttle throttle, Executor executor) {
return new RecordedRetriableStream(
method, new Metadata(), channelBufferUsed, PER_RPC_BUFFER_LIMIT, CHANNEL_BUFFER_LIMIT,
MoreExecutors.directExecutor(), fakeClock.getScheduledExecutorService(),
executor, fakeClock.getScheduledExecutorService(),
null, HEDGING_POLICY, throttle);
}

Expand Down Expand Up @@ -2482,6 +2488,121 @@ public void hedging_pushback_negative() {
inOrder.verifyNoMoreInteractions();
}

// This is for hedging deadlock when multiple in-flight streams races when transports call back,
// particularly for OkHttp:
// e.g. stream1 subListener gets closed() and in turn creates another stream. This ends up with
// transport1 thread lock is held while waiting for transport2 lock for creating a new stream.
// Stream2 subListener gets headersRead() and then try to commit and cancel all other drained
// streams, including the ones that is on transport1. This causes transport2 thread lock held
// while waiting for transport1 (cancel stream requires lock). Thus deadlock.
// Deadlock could also happen when two streams both gets headerRead() at the same time.
// It is believed that retry does not have the issue because streams are created sequentially.
@Test(timeout = 15000)
public void hedging_deadlock() throws Exception {
ClientStream mockStream1 = mock(ClientStream.class); //on transport1
ClientStream mockStream2 = mock(ClientStream.class); //on transport2
ClientStream mockStream3 = mock(ClientStream.class); //on transport2
ClientStream mockStream4 = mock(ClientStream.class); //on transport1

ReentrantLock transport1Lock = new ReentrantLock();
ReentrantLock transport2Lock = new ReentrantLock();
InOrder inOrder = inOrder(
mockStream1, mockStream2, mockStream3,
retriableStreamRecorder, masterListener);
when(retriableStreamRecorder.newSubstream(anyInt()))
.thenReturn(mockStream1)
.thenReturn(mockStream2)
.thenReturn(mockStream3)
.thenAnswer(new Answer<ClientStream>() {

@Override
public ClientStream answer(InvocationOnMock invocation) throws Throwable {
transport1Lock.lock();
return mockStream4;
}
});

hedgingStream = newThrottledHedgingStream(null, fakeClock.getScheduledExecutorService());
hedgingStream.start(masterListener);
assertEquals(1, fakeClock.numPendingTasks());
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
inOrder.verify(mockStream1).isReady();
inOrder.verifyNoMoreInteractions();

fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS);
assertEquals(1, fakeClock.numPendingTasks());
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
inOrder.verify(mockStream2).isReady();
inOrder.verifyNoMoreInteractions();

fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS);
assertEquals(1, fakeClock.numPendingTasks());
ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream3).start(sublistenerCaptor3.capture());
inOrder.verify(mockStream3).isReady();
inOrder.verifyNoMoreInteractions();

doAnswer(new Answer<Void>() {
@Override
@SuppressWarnings("LockNotBeforeTry")
public Void answer(InvocationOnMock invocation) throws Throwable {
transport2Lock.lock();
transport2Lock.unlock();
return null;
}
}).when(mockStream3).cancel(any(Status.class));

doAnswer(new Answer<Void>() {
@Override
@SuppressWarnings("LockNotBeforeTry")
public Void answer(InvocationOnMock invocation) throws Throwable {
transport2Lock.lock();
transport2Lock.unlock();
return null;
}
}).when(mockStream2).cancel(any(Status.class));

CountDownLatch latch = new CountDownLatch(1);
Thread transport1Activity = new Thread(new Runnable() {
@Override
public void run() {
transport1Lock.lock();
try {
sublistenerCaptor1.getValue().headersRead(new Metadata());
latch.countDown();
} finally {
transport1Lock.unlock();
}
}
}, "Thread-transport1");
transport1Activity.start();
Thread transport2Activity = new Thread(new Runnable() {
@Override
public void run() {
transport2Lock.lock();
try {
sublistenerCaptor2.getValue()
.closed(Status.fromCode(NON_FATAL_STATUS_CODE_1), REFUSED, new Metadata());
} finally {
transport2Lock.unlock();
if (transport1Lock.tryLock()) {
transport1Lock.unlock();
}
}
}
}, "Thread-transport2");
transport2Activity.start();
latch.await();
fakeClock.runDueTasks();
transport2Activity.join();
transport1Activity.join();
}

@Test
public void hedging_pushback_positive() {
ClientStream mockStream1 = mock(ClientStream.class);
Expand Down

0 comments on commit 099c624

Please sign in to comment.