Skip to content

Commit

Permalink
[HUDI-5456] Flink streaming read skips uncommitted instants (apache#7540
Browse files Browse the repository at this point in the history
)

Actually introduced by HUDI-5234.
  • Loading branch information
danny0405 authored Dec 23, 2022
1 parent bdfaa4e commit 2a472f4
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@

import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -66,8 +67,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import scala.Serializable;

import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
Expand Down Expand Up @@ -482,7 +481,7 @@ public List<HoodieInstant> filterInstantsWithRange(
HoodieTimeline completedTimeline = commitTimeline.filterCompletedInstants();
if (issuedInstant != null) {
// returns early for streaming mode
return commitTimeline
return completedTimeline
.getInstantsAsStream()
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN, issuedInstant))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ void testFilterInstantsWithRange() {
List<HoodieInstant> instantRange3 = iis.filterInstantsWithRange(timeline, null);
assertEquals(3, instantRange3.size());
assertIterableEquals(Arrays.asList(commit1, commit2, commit3), instantRange3);

// add an inflight instant which should be excluded
HoodieInstant commit4 = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "4");
timeline.createNewInstant(commit4);
timeline = timeline.reload();
assertEquals(4, timeline.getInstants().size());
List<HoodieInstant> instantRange4 = iis.filterInstantsWithRange(timeline, null);
assertEquals(3, instantRange4.size());
}

}

0 comments on commit 2a472f4

Please sign in to comment.