Skip to content

Commit 23bd23b

Browse files
[FLINK-13249][runtime] Fix handling of partition producer responses b… (apache#9138)
* [FLINK-13249][runtime] Fix handling of partition producer responses by running them with the task's executor * Review comments
1 parent fbd8a4f commit 23bd23b

File tree

5 files changed

+23
-22
lines changed

5 files changed

+23
-22
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionProducerStateProvider.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
2323
import org.apache.flink.types.Either;
2424

25-
import java.util.concurrent.CompletableFuture;
25+
import java.util.function.Consumer;
2626

2727
/**
2828
* Request execution state of partition producer, the response accepts state check callbacks.
@@ -34,11 +34,12 @@ public interface PartitionProducerStateProvider {
3434
* @param intermediateDataSetId ID of the parent intermediate data set.
3535
* @param resultPartitionId ID of the result partition to check. This
3636
* identifies the producing execution and partition.
37-
* @return a future with response handle.
37+
* @param responseConsumer consumer for the response handle.
3838
*/
39-
CompletableFuture<? extends ResponseHandle> requestPartitionProducerState(
39+
void requestPartitionProducerState(
4040
IntermediateDataSetID intermediateDataSetId,
41-
ResultPartitionID resultPartitionId);
41+
ResultPartitionID resultPartitionId,
42+
Consumer<? super ResponseHandle> responseConsumer);
4243

4344
/**
4445
* Result of state query, accepts state check callbacks.

flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -601,8 +601,8 @@ void notifyChannelNonEmpty(InputChannel channel) {
601601
void triggerPartitionStateCheck(ResultPartitionID partitionId) {
602602
partitionProducerStateProvider.requestPartitionProducerState(
603603
consumedResultId,
604-
partitionId)
605-
.thenAccept(responseHandle -> {
604+
partitionId,
605+
((PartitionProducerStateProvider.ResponseHandle responseHandle) -> {
606606
boolean isProducingState = new RemoteChannelStateChecker(partitionId, owningTaskName)
607607
.isProducerReadyOrAbortConsumption(responseHandle);
608608
if (isProducingState) {
@@ -612,7 +612,7 @@ void triggerPartitionStateCheck(ResultPartitionID partitionId) {
612612
responseHandle.failConsumption(t);
613613
}
614614
}
615-
});
615+
}));
616616
}
617617

618618
private void queueChannel(InputChannel channel) {

flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java

+10-6
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@
9999
import java.util.concurrent.RejectedExecutionException;
100100
import java.util.concurrent.atomic.AtomicBoolean;
101101
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
102+
import java.util.function.Consumer;
102103

103104
import static org.apache.flink.util.Preconditions.checkArgument;
104105
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -1080,18 +1081,21 @@ else if (current == ExecutionState.RUNNING) {
10801081
// ------------------------------------------------------------------------
10811082

10821083
@Override
1083-
public CompletableFuture<PartitionProducerStateResponseHandle> requestPartitionProducerState(
1084+
public void requestPartitionProducerState(
10841085
final IntermediateDataSetID intermediateDataSetId,
1085-
final ResultPartitionID resultPartitionId) {
1086+
final ResultPartitionID resultPartitionId,
1087+
Consumer<? super ResponseHandle> responseConsumer) {
1088+
10861089
final CompletableFuture<ExecutionState> futurePartitionState =
10871090
partitionProducerStateChecker.requestPartitionProducerState(
10881091
jobId,
10891092
intermediateDataSetId,
10901093
resultPartitionId);
1091-
final CompletableFuture<PartitionProducerStateResponseHandle> result =
1092-
futurePartitionState.handleAsync(PartitionProducerStateResponseHandle::new, executor);
1093-
FutureUtils.assertNoException(result);
1094-
return result;
1094+
1095+
FutureUtils.assertNoException(
1096+
futurePartitionState
1097+
.handle(PartitionProducerStateResponseHandle::new)
1098+
.thenAcceptAsync(responseConsumer, executor));
10951099
}
10961100

10971101
// ------------------------------------------------------------------------

flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java

+1-5
Original file line numberDiff line numberDiff line change
@@ -21,23 +21,19 @@
2121
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
2222
import org.apache.flink.runtime.io.network.buffer.BufferPool;
2323
import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
24-
import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider.ResponseHandle;
2524
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
2625
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
2726
import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
2827
import org.apache.flink.util.function.SupplierWithException;
2928

3029
import java.io.IOException;
31-
import java.util.concurrent.CompletableFuture;
3230

3331
/**
3432
* Utility class to encapsulate the logic of building a {@link SingleInputGate} instance.
3533
*/
3634
public class SingleInputGateBuilder {
3735

38-
private static final CompletableFuture<ResponseHandle> NO_OP_PRODUCER_CHECKER_RESULT = new CompletableFuture<>();
39-
40-
public static final PartitionProducerStateProvider NO_OP_PRODUCER_CHECKER = (dsid, id) -> NO_OP_PRODUCER_CHECKER_RESULT;
36+
public static final PartitionProducerStateProvider NO_OP_PRODUCER_CHECKER = (dsid, id, consumer) -> {};
4137

4238
private final IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
4339

flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -656,7 +656,7 @@ public void testTriggerPartitionStateUpdate() throws Exception {
656656
final CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
657657
when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise);
658658

659-
task.requestPartitionProducerState(resultId, partitionId).thenAccept(checkResult ->
659+
task.requestPartitionProducerState(resultId, partitionId, checkResult ->
660660
assertThat(remoteChannelStateChecker.isProducerReadyOrAbortConsumption(checkResult), is(false))
661661
);
662662

@@ -680,7 +680,7 @@ public void testTriggerPartitionStateUpdate() throws Exception {
680680
final CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
681681
when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise);
682682

683-
task.requestPartitionProducerState(resultId, partitionId).thenAccept(checkResult ->
683+
task.requestPartitionProducerState(resultId, partitionId, checkResult ->
684684
assertThat(remoteChannelStateChecker.isProducerReadyOrAbortConsumption(checkResult), is(false))
685685
);
686686

@@ -711,7 +711,7 @@ public void testTriggerPartitionStateUpdate() throws Exception {
711711
CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
712712
when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise);
713713

714-
task.requestPartitionProducerState(resultId, partitionId).thenAccept(checkResult -> {
714+
task.requestPartitionProducerState(resultId, partitionId, checkResult -> {
715715
if (remoteChannelStateChecker.isProducerReadyOrAbortConsumption(checkResult)) {
716716
callCount.incrementAndGet();
717717
}
@@ -749,7 +749,7 @@ public void testTriggerPartitionStateUpdate() throws Exception {
749749
CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
750750
when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise);
751751

752-
task.requestPartitionProducerState(resultId, partitionId).thenAccept(checkResult -> {
752+
task.requestPartitionProducerState(resultId, partitionId, checkResult -> {
753753
if (remoteChannelStateChecker.isProducerReadyOrAbortConsumption(checkResult)) {
754754
callCount.incrementAndGet();
755755
}

0 commit comments

Comments
 (0)