Skip to content

Commit 1e7acad

Browse files
committed
[flink][lookup] Check max pending snapshot count for lookup join
1 parent 56e036e commit 1e7acad

File tree

5 files changed

+112
-3
lines changed

5 files changed

+112
-3
lines changed

docs/layouts/shortcodes/generated/flink_connector_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,12 @@
6868
<td>Duration</td>
6969
<td>Specific dynamic partition refresh interval for lookup, scan all partitions and obtain corresponding partition.</td>
7070
</tr>
71+
<tr>
72+
<td><h5>lookup.max-pending-snapshot.count</h5></td>
73+
<td style="word-wrap: break-word;">5</td>
74+
<td>Integer</td>
75+
<td>The max pending snapshot count between the lookup snapshot and the latest snapshot.</td>
76+
</tr>
7177
<tr>
7278
<td><h5>lookup.refresh.async</h5></td>
7379
<td style="word-wrap: break-word;">false</td>

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,13 @@ public class FlinkConnectorOptions {
277277
.defaultValue(false)
278278
.withDescription("Whether to enable async lookup join.");
279279

280+
public static final ConfigOption<Integer> LOOKUP_MAX_PENDING_SNAPSHOT_COUNT =
281+
ConfigOptions.key("lookup.max-pending-snapshot.count")
282+
.intType()
283+
.defaultValue(5)
284+
.withDescription(
285+
"The max pending snapshot count between the lookup snapshot and the latest snapshot.");
286+
280287
public static final ConfigOption<Integer> LOOKUP_BOOTSTRAP_PARALLELISM =
281288
ConfigOptions.key("lookup.bootstrap-parallelism")
282289
.intType()

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.paimon.sort.BinaryExternalSortBuffer;
3434
import org.apache.paimon.table.FileStoreTable;
3535
import org.apache.paimon.types.RowType;
36+
import org.apache.paimon.utils.ExecutorThreadFactory;
3637
import org.apache.paimon.utils.FieldsComparator;
3738
import org.apache.paimon.utils.FileIOUtils;
3839
import org.apache.paimon.utils.MutableObjectIterator;
@@ -57,6 +58,7 @@
5758
import java.util.concurrent.atomic.AtomicInteger;
5859
import java.util.concurrent.atomic.AtomicReference;
5960

61+
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_MAX_PENDING_SNAPSHOT_COUNT;
6062
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC;
6163

6264
/** Lookup table of full cache. */
@@ -74,12 +76,14 @@ public abstract class FullCacheLookupTable implements LookupTable {
7476
protected RocksDBStateFactory stateFactory;
7577
private final ExecutorService refreshExecutor;
7678
private final AtomicReference<Exception> cachedException;
79+
private final int maxPendingSnapshotCount;
80+
private final FileStoreTable table;
7781
private LookupStreamingReader reader;
7882
private Predicate specificPartition;
7983

8084
public FullCacheLookupTable(Context context) {
8185
this.context = context;
82-
FileStoreTable table = context.table;
86+
this.table = context.table;
8387
List<String> sequenceFields = new ArrayList<>();
8488
if (table.primaryKeys().size() > 0) {
8589
sequenceFields = new CoreOptions(table.options()).sequenceField();
@@ -106,10 +110,20 @@ public FullCacheLookupTable(Context context) {
106110
this.userDefinedSeqComparator = null;
107111
this.appendUdsFieldNumber = 0;
108112
}
113+
114+
Options options = Options.fromMap(context.table.options());
109115
this.projectedType = projectedType;
110-
this.refreshAsync = Options.fromMap(context.table.options()).get(LOOKUP_REFRESH_ASYNC);
111-
this.refreshExecutor = this.refreshAsync ? Executors.newSingleThreadExecutor() : null;
116+
this.refreshAsync = options.get(LOOKUP_REFRESH_ASYNC);
117+
this.refreshExecutor =
118+
this.refreshAsync
119+
? Executors.newSingleThreadExecutor(
120+
new ExecutorThreadFactory(
121+
String.format(
122+
"%s-lookup-refresh",
123+
Thread.currentThread().getName())))
124+
: null;
112125
this.cachedException = new AtomicReference<>();
126+
this.maxPendingSnapshotCount = options.get(LOOKUP_MAX_PENDING_SNAPSHOT_COUNT);
113127
}
114128

115129
@Override
@@ -184,6 +198,18 @@ public void refresh() throws Exception {
184198
}
185199

186200
private void doRefresh() throws Exception {
201+
Long latestSnapshotId = table.snapshotManager().latestSnapshotId();
202+
Long nextSnapshotId = reader.nextSnapshotId();
203+
if (latestSnapshotId != null
204+
&& nextSnapshotId != null
205+
&& latestSnapshotId - nextSnapshotId > maxPendingSnapshotCount) {
206+
throw new Exception(
207+
String.format(
208+
"The latest snapshot id %s is much greater than the next snapshot id %s for %s, "
209+
+ "you may need to increase the parallelism of lookup operator.",
210+
latestSnapshotId, nextSnapshotId, maxPendingSnapshotCount));
211+
}
212+
187213
while (true) {
188214
try (RecordReaderIterator<InternalRow> batch =
189215
new RecordReaderIterator<>(reader.nextBatch(false))) {

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,4 +144,9 @@ public RecordReader<InternalRow> nextBatch(boolean useParallelism) throws Except
144144
}
145145
return reader;
146146
}
147+
148+
@Nullable
149+
public Long nextSnapshotId() {
150+
return scan.checkpoint();
151+
}
147152
}

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -656,6 +656,71 @@ public void testPKLookupTableRefreshAsync(boolean refreshAsync) throws Exception
656656
table.close();
657657
}
658658

659+
@Test
660+
public void testRefreshMaxPendingSnapshot() throws Exception {
661+
FileStoreTable storeTable = createTable(singletonList("f0"), new Options());
662+
FullCacheLookupTable.Context context =
663+
new FullCacheLookupTable.Context(
664+
storeTable,
665+
new int[] {0, 1, 2},
666+
null,
667+
null,
668+
tempDir.toFile(),
669+
singletonList("f0"));
670+
table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10);
671+
table.open();
672+
673+
// test bulk load 100_000 records
674+
List<Pair<byte[], byte[]>> records = new ArrayList<>();
675+
for (int i = 1; i <= 100_000; i++) {
676+
InternalRow row = row(i, 11 * i, 111 * i);
677+
records.add(Pair.of(table.toKeyBytes(row), table.toValueBytes(row)));
678+
}
679+
records.sort((o1, o2) -> SortUtil.compareBinary(o1.getKey(), o2.getKey()));
680+
TableBulkLoader bulkLoader = table.createBulkLoader();
681+
for (Pair<byte[], byte[]> kv : records) {
682+
bulkLoader.write(kv.getKey(), kv.getValue());
683+
}
684+
bulkLoader.finish();
685+
686+
for (int i = 1; i <= 100_000; i++) {
687+
List<InternalRow> result = table.get(row(i));
688+
assertThat(result).hasSize(1);
689+
assertRow(result.get(0), i, 11 * i, 111 * i);
690+
}
691+
692+
// Add 10 snapshots in the table
693+
BatchWriteBuilder writeBuilder = storeTable.newBatchWriteBuilder();
694+
try (BatchTableWrite write = writeBuilder.newWrite()) {
695+
for (int i = 1; i <= 100; i++) {
696+
write.write(row(i, 11 * i, 111 * i), 0);
697+
}
698+
try (BatchTableCommit commit = writeBuilder.newCommit()) {
699+
commit.commit(write.prepareCommit());
700+
}
701+
}
702+
703+
// test refresh to update
704+
table.refresh();
705+
706+
for (int k = 0; k < 10; k++) {
707+
try (BatchTableWrite write = writeBuilder.newWrite()) {
708+
for (int i = 1; i <= 100; i++) {
709+
write.write(row(i, 11 * i, 111 * i), 0);
710+
}
711+
try (BatchTableCommit commit = writeBuilder.newCommit()) {
712+
commit.commit(write.prepareCommit());
713+
}
714+
}
715+
}
716+
717+
assertThatThrownBy(() -> table.refresh())
718+
.hasMessageContaining(
719+
"you may need to increase the parallelism of lookup operator");
720+
721+
table.close();
722+
}
723+
659724
private FileStoreTable createDimTable() throws Exception {
660725
FileIO fileIO = LocalFileIO.create();
661726
org.apache.paimon.fs.Path tablePath =

0 commit comments

Comments
 (0)