Skip to content

Commit

Permalink
[HUDI-7129] Fix bug when upgrade from table version three using Upgra…
Browse files Browse the repository at this point in the history
…deOrDowngradeProcedure (#10147)
  • Loading branch information
beyond1920 authored Nov 22, 2023
1 parent 18f7181 commit cda9dbc
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.MetadataPartitionType;

import java.util.Hashtable;
import java.util.Map;

import static org.apache.hudi.common.table.HoodieTableConfig.DATABASE_NAME;
import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_CHECKSUM;
import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.metadataPartitionExists;
Expand All @@ -40,6 +42,10 @@ public class ThreeToFourUpgradeHandler implements UpgradeHandler {
@Override
public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
Map<ConfigProperty, String> tablePropsToAdd = new Hashtable<>();
String database = config.getString(DATABASE_NAME);
if (StringUtils.nonEmpty(database)) {
tablePropsToAdd.put(DATABASE_NAME, database);
}
tablePropsToAdd.put(TABLE_CHECKSUM, String.valueOf(HoodieTableConfig.generateChecksum(config.getProps())));
// if metadata is enabled and files partition exist then update TABLE_METADATA_INDEX_COMPLETED
// schema for the files partition is same between the two versions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,18 @@ package org.apache.spark.sql.hudi.command.procedures
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion
import org.apache.hudi.common.table.{HoodieTableMetaClient, HoodieTableVersion}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, HoodieTableVersion}
import org.apache.hudi.common.util.Option
import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig, HoodieCleanConfig}
import org.apache.hudi.index.HoodieIndex
import org.apache.hudi.table.upgrade.{SparkUpgradeDowngradeHelper, UpgradeDowngrade}
import org.apache.hudi.HoodieCLIUtils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}

import java.util.function.Supplier
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}

class UpgradeOrDowngradeProcedure extends BaseProcedure with ProcedureBuilder with Logging {
Expand All @@ -51,9 +53,8 @@ class UpgradeOrDowngradeProcedure extends BaseProcedure with ProcedureBuilder wi

val tableName = getArgValueOrDefault(args, PARAMETERS(0))
val toVersion = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String]
val basePath = getBasePath(tableName)

val config = getWriteConfigWithTrue(basePath)
val config = getWriteConfigWithTrue(tableName)
val basePath = config.getBasePath
val metaClient = HoodieTableMetaClient.builder
.setConf(jsc.hadoopConfiguration)
.setBasePath(config.getBasePath)
Expand All @@ -78,12 +79,16 @@ class UpgradeOrDowngradeProcedure extends BaseProcedure with ProcedureBuilder wi
Seq(Row(result))
}

private def getWriteConfigWithTrue(basePath: String) = {
private def getWriteConfigWithTrue(tableOpt: scala.Option[Any]) = {
val basePath = getBasePath(tableOpt)
val (tableName, database) = HoodieCLIUtils.getTableIdentifier(tableOpt.get.asInstanceOf[String])
HoodieWriteConfig.newBuilder
.forTable(tableName)
.withPath(basePath)
.withRollbackUsingMarkers(true)
.withCleanConfig(HoodieCleanConfig.newBuilder.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER).build)
.withIndexConfig(HoodieIndexConfig.newBuilder.withIndexType(HoodieIndex.IndexType.BLOOM).build)
.withProps(Map(HoodieTableConfig.DATABASE_NAME.key -> database.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase)).asJava)
.build
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,33 @@ class TestUpgradeOrDowngradeProcedure extends HoodieSparkProcedureTestBase {
}
}

test("Test Call upgrade_table from version three") {
withTempDir { tmp =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '$tablePath'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)

// downgrade table to THREE
checkAnswer(s"""call downgrade_table(table => '$tableName', to_version => 'THREE')""")(Seq(true))
// upgrade table to FOUR
checkAnswer(s"""call upgrade_table(table => '$tableName', to_version => 'FOUR')""")(Seq(true))
}
}

@throws[IOException]
private def assertTableVersionFromPropertyFile(metaClient: HoodieTableMetaClient, versionCode: Int): Unit = {
val propertyFile = new Path(metaClient.getMetaPath + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE)
Expand Down

0 comments on commit cda9dbc

Please sign in to comment.