Skip to content

Commit

Permalink
[core] remove Dynamic Bucket Row (apache#2541)
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 authored Dec 20, 2023
1 parent 2449c32 commit 86b6350
Show file tree
Hide file tree
Showing 16 changed files with 101 additions and 180 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import static org.apache.paimon.utils.Preconditions.checkArgument;

/**
* {@link KeyAndBucketExtractor} for {@link InternalRow} with extracting bucket from {@link
* DynamicBucketRow}.
* {@link KeyAndBucketExtractor} for {@link InternalRow}, just throws error when extract bucket from
* dynamic row.
*/
public class DynamicBucketRowKeyExtractor extends RowKeyExtractor {

Expand All @@ -41,10 +41,6 @@ public DynamicBucketRowKeyExtractor(TableSchema schema) {

@Override
public int bucket() {
if (record instanceof DynamicBucketRow) {
return ((DynamicBucketRow) record).bucket();
}
throw new IllegalArgumentException(
"Only supports DynamicBucketRow, illegal record type: " + record.getClass());
throw new IllegalArgumentException("Can't extract bucket from row in dynamic bucket mode.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public interface TableWrite extends AutoCloseable {
/** Write a row to the writer. */
void write(InternalRow row) throws Exception;

/** Write a row with bucket. */
void write(InternalRow row, int bucket) throws Exception;

/**
* Compact a bucket of a partition. By default, it will determine whether to perform the
* compaction according to the 'num-sorted-run.compaction-trigger' option. If fullCompaction is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,23 @@ public void write(InternalRow row) throws Exception {
writeAndReturn(row);
}

@Override
public void write(InternalRow row, int bucket) throws Exception {
writeAndReturn(row, bucket);
}

public SinkRecord writeAndReturn(InternalRow row) throws Exception {
SinkRecord record = toSinkRecord(row);
write.write(record.partition(), record.bucket(), recordExtractor.extract(record));
return record;
}

public SinkRecord writeAndReturn(InternalRow row, int bucket) throws Exception {
SinkRecord record = toSinkRecord(row, bucket);
write.write(record.partition(), bucket, recordExtractor.extract(record));
return record;
}

@VisibleForTesting
public T writeAndReturnData(InternalRow row) throws Exception {
SinkRecord record = toSinkRecord(row);
Expand All @@ -139,6 +150,15 @@ private SinkRecord toSinkRecord(InternalRow row) {
row);
}

private SinkRecord toSinkRecord(InternalRow row, int bucket) {
keyAndBucketExtractor.setRecord(row);
return new SinkRecord(
keyAndBucketExtractor.partition(),
bucket,
keyAndBucketExtractor.trimmedPrimaryKey(),
row);
}

public SinkRecord toLogRecord(SinkRecord record) {
keyAndBucketExtractor.setRecord(record.row());
return new SinkRecord(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.sink.DynamicBucketRow;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.Pair;

import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -135,8 +135,8 @@ private static DataFileMeta newFile(long timeMillis) {
.toLocalDateTime()));
}

private DynamicBucketRow row(int pt, int col, int pk, int bucket) {
private Pair<InternalRow, Integer> row(int pt, int col, int pk, int bucket) {
GenericRow row = GenericRow.of(pt, col, pk);
return new DynamicBucketRow(row, bucket);
return Pair.of(row, bucket);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@
import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.DynamicBucketRow;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.utils.Pair;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -64,8 +65,8 @@ protected Options tableOptions() {
return options;
}

private DynamicBucketRow createRow(int partition, int bucket, int key, int value) {
return new DynamicBucketRow(GenericRow.of(partition, key, value), bucket);
private Pair<InternalRow, Integer> createRow(int partition, int bucket, int key, int value) {
return Pair.of(GenericRow.of(partition, key, value), bucket);
}

private Map<BinaryRow, Map<Integer, int[]>> readIndex(List<CommitMessage> messages) {
Expand All @@ -89,11 +90,11 @@ private Map<BinaryRow, Map<Integer, int[]>> readIndex(List<CommitMessage> messag
@Test
public void testAssignBucket() throws Exception {
assertThatThrownBy(() -> write.write(GenericRow.of(1, 1, 1)))
.hasMessageContaining("Only supports DynamicBucketRow");
.hasMessageContaining("Can't extract bucket from row in dynamic bucket mode.");

// commit two partitions
write.write(createRow(1, 1, 1, 1));
write.write(createRow(2, 2, 2, 2));
write(write, createRow(1, 1, 1, 1));
write(write, createRow(2, 2, 2, 2));
List<CommitMessage> commitMessages = write.prepareCommit(true, 0);
Map<BinaryRow, Map<Integer, int[]>> index = readIndex(commitMessages);
assertThat(index).containsOnlyKeys(row(1), row(2));
Expand All @@ -103,7 +104,7 @@ public void testAssignBucket() throws Exception {
commit.commit(0, commitMessages);

// only one partition
write.write(createRow(1, 1, 2, 2));
write(write, createRow(1, 1, 2, 2));
commitMessages = write.prepareCommit(true, 1);
index = readIndex(commitMessages);
assertThat(index).containsOnlyKeys(row(1));
Expand All @@ -113,7 +114,7 @@ public void testAssignBucket() throws Exception {

// restore
write = writeBuilder.newWrite();
write.write(createRow(1, 1, 3, 3));
write(write, createRow(1, 1, 3, 3));
commitMessages = write.prepareCommit(true, 2);
index = readIndex(commitMessages);
assertThat(index).containsOnlyKeys(row(1));
Expand All @@ -128,16 +129,21 @@ public void testAssignBucket() throws Exception {
@Test
public void testNotCreateNewFile() throws Exception {
// commit two partitions
write.write(createRow(1, 1, 1, 1));
write.write(createRow(2, 2, 2, 2));
write(write, createRow(1, 1, 1, 1));
write(write, createRow(2, 2, 2, 2));
commit.commit(0, write.prepareCommit(true, 0));

// same record
write.write(createRow(1, 1, 1, 1));
write(write, createRow(1, 1, 1, 1));
List<CommitMessage> commitMessages = write.prepareCommit(true, 1);
assertThat(readIndex(commitMessages)).isEmpty();

write.close();
commit.close();
}

private void write(StreamTableWrite write, Pair<InternalRow, Integer> rowWithBucket)
throws Exception {
write.write(rowWithBucket.getKey(), rowWithBucket.getValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
import org.apache.paimon.table.sink.BatchWriteBuilderImpl;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.DynamicBucketRow;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.Pair;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
Expand All @@ -57,7 +57,8 @@ public void testOverwriteDynamicBucketTable() throws Exception {
.indexMaintainer;

Assertions.assertThat(indexMaintainer.isEmpty()).isTrue();
batchTableWrite.write(data(0));
Pair<InternalRow, Integer> rowWithBucket = data(0);
batchTableWrite.write(rowWithBucket.getKey(), rowWithBucket.getValue());
Assertions.assertThat(
((CommitMessageImpl) batchTableWrite.prepareCommit().get(0))
.indexIncrement()
Expand All @@ -74,7 +75,8 @@ protected List<CommitMessage> writeDataDefault(int size, int times) throws Excep
try (BatchTableWrite batchTableWrite = builder.newWrite()) {
for (int i = 0; i < times; i++) {
for (int j = 0; j < size; j++) {
batchTableWrite.write(data(i));
Pair<InternalRow, Integer> rowWithBucket = data(i);
batchTableWrite.write(rowWithBucket.getKey(), rowWithBucket.getValue());
}
}
messages = batchTableWrite.prepareCommit();
Expand All @@ -97,13 +99,13 @@ protected Schema schemaDefault() {
return schemaBuilder.build();
}

private static InternalRow data(int bucket) {
private static Pair<InternalRow, Integer> data(int bucket) {
GenericRow row =
GenericRow.of(
RANDOM.nextLong(),
(long) RANDOM.nextInt(10000),
(long) RANDOM.nextInt(10000),
(long) RANDOM.nextInt(10000));
return new DynamicBucketRow(row, bucket);
return Pair.of(row, bucket);
}
}
Loading

0 comments on commit 86b6350

Please sign in to comment.