Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-7129] Fix bug when upgrade from table version three using UpgradeOrDowngradeProcedure #10147

Merged
merged 1 commit into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.MetadataPartitionType;

import java.util.Hashtable;
import java.util.Map;

import static org.apache.hudi.common.table.HoodieTableConfig.DATABASE_NAME;
import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_CHECKSUM;
import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.metadataPartitionExists;
Expand All @@ -40,6 +42,10 @@ public class ThreeToFourUpgradeHandler implements UpgradeHandler {
@Override
public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
Map<ConfigProperty, String> tablePropsToAdd = new Hashtable<>();
String database = config.getString(DATABASE_NAME);
if (StringUtils.nonEmpty(database)) {
tablePropsToAdd.put(DATABASE_NAME, database);
}
tablePropsToAdd.put(TABLE_CHECKSUM, String.valueOf(HoodieTableConfig.generateChecksum(config.getProps())));
// if metadata is enabled and files partition exist then update TABLE_METADATA_INDEX_COMPLETED
// schema for the files partition is same between the two versions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,18 @@ package org.apache.spark.sql.hudi.command.procedures
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion
import org.apache.hudi.common.table.{HoodieTableMetaClient, HoodieTableVersion}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, HoodieTableVersion}
import org.apache.hudi.common.util.Option
import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig, HoodieCleanConfig}
import org.apache.hudi.index.HoodieIndex
import org.apache.hudi.table.upgrade.{SparkUpgradeDowngradeHelper, UpgradeDowngrade}
import org.apache.hudi.HoodieCLIUtils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}

import java.util.function.Supplier
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}

class UpgradeOrDowngradeProcedure extends BaseProcedure with ProcedureBuilder with Logging {
Expand All @@ -51,9 +53,8 @@ class UpgradeOrDowngradeProcedure extends BaseProcedure with ProcedureBuilder wi

val tableName = getArgValueOrDefault(args, PARAMETERS(0))
val toVersion = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String]
val basePath = getBasePath(tableName)

val config = getWriteConfigWithTrue(basePath)
val config = getWriteConfigWithTrue(tableName)
val basePath = config.getBasePath
val metaClient = HoodieTableMetaClient.builder
.setConf(jsc.hadoopConfiguration)
.setBasePath(config.getBasePath)
Expand All @@ -78,12 +79,16 @@ class UpgradeOrDowngradeProcedure extends BaseProcedure with ProcedureBuilder wi
Seq(Row(result))
}

private def getWriteConfigWithTrue(basePath: String) = {
private def getWriteConfigWithTrue(tableOpt: scala.Option[Any]) = {
val basePath = getBasePath(tableOpt)
val (tableName, database) = HoodieCLIUtils.getTableIdentifier(tableOpt.get.asInstanceOf[String])
HoodieWriteConfig.newBuilder
.forTable(tableName)
.withPath(basePath)
.withRollbackUsingMarkers(true)
.withCleanConfig(HoodieCleanConfig.newBuilder.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER).build)
.withIndexConfig(HoodieIndexConfig.newBuilder.withIndexType(HoodieIndex.IndexType.BLOOM).build)
.withProps(Map(HoodieTableConfig.DATABASE_NAME.key -> database.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase)).asJava)
.build
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,33 @@ class TestUpgradeOrDowngradeProcedure extends HoodieSparkProcedureTestBase {
}
}

test("Test Call upgrade_table from version three") {
withTempDir { tmp =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '$tablePath'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)

// downgrade table to THREE
checkAnswer(s"""call downgrade_table(table => '$tableName', to_version => 'THREE')""")(Seq(true))
// upgrade table to FOUR
checkAnswer(s"""call upgrade_table(table => '$tableName', to_version => 'FOUR')""")(Seq(true))
}
}

@throws[IOException]
private def assertTableVersionFromPropertyFile(metaClient: HoodieTableMetaClient, versionCode: Int): Unit = {
val propertyFile = new Path(metaClient.getMetaPath + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE)
Expand Down
Loading