Skip to content

Commit 635b51e

Browse files
author
史大洋
committed
[spark] Support data evolution for bucket table
1 parent b7a42cd commit 635b51e

File tree

8 files changed

+132
-23
lines changed

8 files changed

+132
-23
lines changed

docs/content/append-table/row-tracking.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ CREATE TABLE part_t (
4747
WITH ('row-tracking.enabled' = 'true');
4848
```
4949
Notice that:
50-
- Row tracking is only supported for unaware append tables, not for primary key tables. Which means you can't define `bucket` and `bucket-key` for the table.
50+
- Row tracking is only supported for unaware or hash_fixed bucket append tables, not for primary key tables.
5151
- Only spark support update, merge into and delete operations on row-tracking tables, Flink SQL does not support these operations yet.
5252
- This function is experimental, this line will be removed after being stable.
5353

paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,31 @@ public TableSchema(
127127
numBucket = CoreOptions.fromMap(options).bucket();
128128
}
129129

130+
private TableSchema(
131+
int version,
132+
long id,
133+
List<DataField> fields,
134+
int highestFieldId,
135+
List<String> partitionKeys,
136+
List<String> primaryKeys,
137+
Map<String, String> options,
138+
@Nullable String comment,
139+
long timeMillis,
140+
List<String> bucketKeys,
141+
int numBucket) {
142+
this.version = version;
143+
this.id = id;
144+
this.fields = Collections.unmodifiableList(new ArrayList<>(fields));
145+
this.highestFieldId = highestFieldId;
146+
this.partitionKeys = partitionKeys;
147+
this.primaryKeys = primaryKeys;
148+
this.options = options;
149+
this.comment = comment;
150+
this.timeMillis = timeMillis;
151+
this.bucketKeys = bucketKeys;
152+
this.numBucket = numBucket;
153+
}
154+
130155
public int version() {
131156
return version;
132157
}
@@ -294,7 +319,9 @@ public TableSchema project(@Nullable List<String> writeCols) {
294319
primaryKeys,
295320
options,
296321
comment,
297-
timeMillis);
322+
timeMillis,
323+
bucketKeys,
324+
numBucket);
298325
}
299326

300327
private List<DataField> projectedDataFields(List<String> projectedFieldNames) {

paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -632,7 +632,7 @@ private static void validateRowTracking(TableSchema schema, CoreOptions options)
632632
boolean rowTrackingEnabled = options.rowTrackingEnabled();
633633
if (rowTrackingEnabled) {
634634
checkArgument(
635-
options.bucket() == -1,
635+
options.bucket() == -1 || options.bucket() > 0,
636636
"Cannot define %s for row tracking table, it only support bucket = -1",
637637
CoreOptions.BUCKET.key());
638638
checkArgument(

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/DataEvolutionSparkTableWrite.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import scala.collection.mutable.ListBuffer
4040
case class DataEvolutionSparkTableWrite(
4141
writeBuilder: BatchWriteBuilder,
4242
writeType: RowType,
43-
firstRowIdToPartitionMap: mutable.HashMap[Long, Tuple2[BinaryRow, Long]],
43+
firstRowIdToPartitionMap: mutable.HashMap[Long, Tuple3[BinaryRow, Int, Long]],
4444
blobAsDescriptor: Boolean,
4545
catalogContext: CatalogContext)
4646
extends SparkTableWriteTrait {
@@ -68,7 +68,7 @@ case class DataEvolutionSparkTableWrite(
6868

6969
def newCurrentWriter(firstRowId: Long): Unit = {
7070
finishCurrentWriter()
71-
val (partition, numRecords) = firstRowIdToPartitionMap.getOrElse(firstRowId, null)
71+
val (partition, bucket, numRecords) = firstRowIdToPartitionMap.getOrElse(firstRowId, null)
7272
if (partition == null) {
7373
throw new IllegalArgumentException(
7474
s"First row ID $firstRowId not found in partition map. " +
@@ -81,8 +81,8 @@ case class DataEvolutionSparkTableWrite(
8181
.asInstanceOf[TableWriteImpl[InternalRow]]
8282
.getWrite
8383
.asInstanceOf[AbstractFileStoreWrite[InternalRow]]
84-
.createWriter(partition, 0)
85-
currentWriter = PerFileWriter(partition, firstRowId, writer, numRecords)
84+
.createWriter(partition, bucket)
85+
currentWriter = PerFileWriter(partition, bucket, firstRowId, writer, numRecords)
8686
}
8787

8888
def finishCurrentWriter(): Unit = {
@@ -122,6 +122,7 @@ case class DataEvolutionSparkTableWrite(
122122

123123
private case class PerFileWriter(
124124
partition: BinaryRow,
125+
bucket: Int,
125126
firstRowId: Long,
126127
recordWriter: RecordWriter[InternalRow],
127128
numRecords: Long) {
@@ -149,7 +150,7 @@ case class DataEvolutionSparkTableWrite(
149150
val dataFileMeta = dataFiles.get(0).assignFirstRowId(firstRowId)
150151
new CommitMessageImpl(
151152
partition,
152-
0,
153+
bucket,
153154
null,
154155
new DataIncrement(
155156
java.util.Arrays.asList(dataFileMeta),

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,19 @@
1818

1919
package org.apache.paimon.spark.catalyst.analysis
2020

21+
import org.apache.paimon.CoreOptions.BUCKET_KEY
2122
import org.apache.paimon.spark.SparkTable
2223
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
2324
import org.apache.paimon.spark.commands.{MergeIntoPaimonDataEvolutionTable, MergeIntoPaimonTable}
25+
import org.apache.paimon.utils.StringUtils
2426

2527
import org.apache.spark.sql.SparkSession
2628
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, SubqueryExpression}
2729
import org.apache.spark.sql.catalyst.plans.logical._
2830
import org.apache.spark.sql.catalyst.rules.Rule
2931

32+
import java.util
33+
3034
import scala.collection.JavaConverters._
3135

3236
trait PaimonMergeIntoBase
@@ -72,6 +76,13 @@ trait PaimonMergeIntoBase
7276
resolveNotMatchedBySourceActions(merge, targetOutput, dataEvolutionEnabled)
7377

7478
if (dataEvolutionEnabled) {
79+
val bucketKeySt = v2Table.getTable.options().get(BUCKET_KEY.key)
80+
if (!StringUtils.isNullOrWhitespaceOnly(bucketKeySt)) {
81+
checkUpdateActionValidityForBucketKey(
82+
AttributeSet(targetOutput),
83+
updateActions,
84+
bucketKeySt.split(",").toSeq)
85+
}
7586
MergeIntoPaimonDataEvolutionTable(
7687
v2Table,
7788
merge.targetTable,
@@ -142,7 +153,7 @@ trait PaimonMergeIntoBase
142153
lazy val isMergeConditionValid = {
143154
val mergeExpressions = splitConjunctivePredicates(mergeCondition)
144155
primaryKeys.forall {
145-
primaryKey => isUpdateExpressionToPrimaryKey(targetOutput, mergeExpressions, primaryKey)
156+
primaryKey => isUpdateExpressionForKey(targetOutput, mergeExpressions, primaryKey)
146157
}
147158
}
148159

@@ -156,4 +167,22 @@ trait PaimonMergeIntoBase
156167
throw new RuntimeException("Can't update the primary key column in update clause.")
157168
}
158169
}
170+
171+
/** This check will avoid to update the bucket key columns */
172+
private def checkUpdateActionValidityForBucketKey(
173+
targetOutput: AttributeSet,
174+
actions: Seq[UpdateAction],
175+
bucketKeys: Seq[String]): Unit = {
176+
177+
// Check whether there are an update expression related to any primary key.
178+
def isUpdateActionValid(action: UpdateAction): Boolean = {
179+
validUpdateAssignment(targetOutput, bucketKeys, action.assignments)
180+
}
181+
182+
val valid = actions.forall(isUpdateActionValid)
183+
if (!valid) {
184+
throw new RuntimeException(
185+
"Can't update the bucket key column in data-evolution update clause.")
186+
}
187+
}
159188
}

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,23 +41,21 @@ trait RowLevelHelper extends SQLConfHelper {
4141

4242
protected def validUpdateAssignment(
4343
output: AttributeSet,
44-
primaryKeys: Seq[String],
44+
keys: Seq[String],
4545
assignments: Seq[Assignment]): Boolean = {
46-
!primaryKeys.exists {
47-
primaryKey => isUpdateExpressionToPrimaryKey(output, assignments, primaryKey)
48-
}
46+
!keys.exists(key => isUpdateExpressionForKey(output, assignments, key))
4947
}
5048

5149
// Check whether there is an update expression related to primary key.
52-
protected def isUpdateExpressionToPrimaryKey(
50+
protected def isUpdateExpressionForKey(
5351
output: AttributeSet,
5452
expressions: Seq[Expression],
55-
primaryKey: String): Boolean = {
53+
key: String): Boolean = {
5654
val resolver = conf.resolver
5755

5856
// Check whether this attribute is same to primary key and is from target table.
59-
def isTargetPrimaryKey(attr: AttributeReference): Boolean = {
60-
resolver(primaryKey, attr.name) && output.contains(attr)
57+
def isTargetKey(attr: AttributeReference): Boolean = {
58+
resolver(key, attr.name) && output.contains(attr)
6159
}
6260

6361
expressions
@@ -67,9 +65,9 @@ trait RowLevelHelper extends SQLConfHelper {
6765
}
6866
.exists {
6967
case EqualTo(left: AttributeReference, right: AttributeReference) =>
70-
isTargetPrimaryKey(left) || isTargetPrimaryKey(right)
68+
isTargetKey(left) || isTargetKey(right)
7169
case Assignment(key: AttributeReference, _) =>
72-
isTargetPrimaryKey(key)
70+
isTargetKey(key)
7371
case _ => false
7472
}
7573
}

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,22 +38,24 @@ import scala.collection.mutable
3838

3939
case class DataEvolutionPaimonWriter(paimonTable: FileStoreTable) extends WriteHelper {
4040

41-
private lazy val firstRowIdToPartitionMap: mutable.HashMap[Long, Tuple2[BinaryRow, Long]] =
41+
private lazy val firstRowIdToPartitionMap: mutable.HashMap[Long, Tuple3[BinaryRow, Int, Long]] =
4242
initPartitionMap()
4343
override val table: FileStoreTable = paimonTable.copy(dynamicOp)
4444

4545
@transient private lazy val serializer = new CommitMessageSerializer
4646

47-
private def initPartitionMap(): mutable.HashMap[Long, Tuple2[BinaryRow, Long]] = {
48-
val firstRowIdToPartitionMap = new mutable.HashMap[Long, Tuple2[BinaryRow, Long]]
47+
private def initPartitionMap(): mutable.HashMap[Long, Tuple3[BinaryRow, Int, Long]] = {
48+
val firstRowIdToPartitionMap = new mutable.HashMap[Long, Tuple3[BinaryRow, Int, Long]]
4949
table
5050
.store()
5151
.newScan()
5252
.readFileIterator()
5353
.forEachRemaining(
5454
k =>
5555
firstRowIdToPartitionMap
56-
.put(k.file().firstRowId(), Tuple2.apply(k.partition(), k.file().rowCount())))
56+
.put(
57+
k.file().firstRowId(),
58+
Tuple3.apply(k.partition(), k.bucket(), k.file().rowCount())))
5759
firstRowIdToPartitionMap
5860
}
5961

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,58 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase {
327327
}
328328
}
329329

330+
test("Data Evolution: merge into bucket table with data-evolution") {
331+
withTable("s", "t") {
332+
sql("CREATE TABLE s (id INT, b INT)")
333+
sql("INSERT INTO s VALUES (1, 11), (2, 22)")
334+
335+
sql(
336+
"CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES (" +
337+
"'row-tracking.enabled' = 'true', " +
338+
"'data-evolution.enabled' = 'true', " +
339+
"'bucket'='2', " +
340+
"'bucket-key'='id')")
341+
sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c FROM range(2, 4)")
342+
343+
sql("""
344+
|MERGE INTO t
345+
|USING s
346+
|ON t.id = s.id
347+
|WHEN MATCHED THEN UPDATE SET t.b = s.b
348+
|WHEN NOT MATCHED THEN INSERT (id, b, c) VALUES (id, b, 11)
349+
|""".stripMargin)
350+
checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(3)))
351+
checkAnswer(
352+
sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
353+
Seq(Row(1, 11, 11, 2, 2), Row(2, 22, 2, 0, 2), Row(3, 3, 3, 1, 2))
354+
)
355+
}
356+
}
357+
358+
test("Data Evolution: merge into bucket table with data-evolution update bucket key") {
359+
withTable("s", "t") {
360+
sql("CREATE TABLE s (id INT, b INT)")
361+
sql("INSERT INTO s VALUES (1, 11), (2, 22)")
362+
363+
sql(
364+
"CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES (" +
365+
"'row-tracking.enabled' = 'true', " +
366+
"'data-evolution.enabled' = 'true', " +
367+
"'bucket'='2', " +
368+
"'bucket-key'='id')")
369+
370+
assertThrows[RuntimeException] {
371+
sql("""
372+
|MERGE INTO t
373+
|USING s
374+
|ON t.id = s.id
375+
|WHEN MATCHED THEN UPDATE SET t.id = s.id
376+
|WHEN NOT MATCHED THEN INSERT (id, b, c) VALUES (id, b, 11)
377+
|""".stripMargin)
378+
}
379+
}
380+
}
381+
330382
test("Data Evolution: update table throws exception") {
331383
withTable("t") {
332384
sql(

0 commit comments

Comments
 (0)