From 2a472f4c4367b30544a93c87448156964df12202 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Fri, 23 Dec 2022 19:40:08 +0800 Subject: [PATCH] [HUDI-5456] Flink streaming read skips uncommitted instants (#7540) Actually introduced by HUDI-5234. --- .../org/apache/hudi/source/IncrementalInputSplits.java | 5 ++--- .../apache/hudi/source/TestIncrementalInputSplits.java | 8 ++++++++ 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java index 0a10c77c2215..92ba50cf1950 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java @@ -53,6 +53,7 @@ import javax.annotation.Nullable; +import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -66,8 +67,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import scala.Serializable; - import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN; import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS; import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS; @@ -482,7 +481,7 @@ public List filterInstantsWithRange( HoodieTimeline completedTimeline = commitTimeline.filterCompletedInstants(); if (issuedInstant != null) { // returns early for streaming mode - return commitTimeline + return completedTimeline .getInstantsAsStream() .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN, issuedInstant)) .collect(Collectors.toList()); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java index b42fd2c04a3c..a37d69e7026f 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java @@ -83,6 +83,14 @@ void testFilterInstantsWithRange() { List instantRange3 = iis.filterInstantsWithRange(timeline, null); assertEquals(3, instantRange3.size()); assertIterableEquals(Arrays.asList(commit1, commit2, commit3), instantRange3); + + // add an inflight instant which should be excluded + HoodieInstant commit4 = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "4"); + timeline.createNewInstant(commit4); + timeline = timeline.reload(); + assertEquals(4, timeline.getInstants().size()); + List instantRange4 = iis.filterInstantsWithRange(timeline, null); + assertEquals(3, instantRange4.size()); } }