diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java index 789cb4bc16d0..2ba0d553f58c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java @@ -57,6 +57,7 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieLogCompactException; import org.apache.hudi.exception.HoodieRollbackException; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -1088,6 +1089,9 @@ public void rollbackFailedBootstrap() { table.rollbackBootstrap(context, createNewInstantTime()); LOG.info("Finished rolling back pending bootstrap"); } + + // if bootstrap failed, lets delete metadata and restart from scratch + HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), context); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java index 7ba20795790e..e697f385e044 100644 --- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java @@ -29,10 +29,12 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineUtils; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; @@ -82,7 +84,10 @@ public abstract class BaseHoodieTableFileIndex implements AutoCloseable { protected final HoodieMetadataConfig metadataConfig; + private final HoodieTableQueryType queryType; private final Option specifiedQueryInstant; + private final Option beginInstantTime; + private final Option endInstantTime; private final List queryPaths; private final boolean shouldIncludePendingCommits; @@ -123,6 +128,8 @@ public abstract class BaseHoodieTableFileIndex implements AutoCloseable { * @param shouldIncludePendingCommits flags whether file-index should exclude any pending operations * @param shouldValidateInstant flags to validate whether query instant is present in the timeline * @param fileStatusCache transient cache of fetched [[FileStatus]]es + * @param beginInstantTime begin instant time for incremental query (optional) + * @param endInstantTime end instant time for incremental query (optional) */ public BaseHoodieTableFileIndex(HoodieEngineContext engineContext, HoodieTableMetaClient metaClient, @@ -133,7 +140,9 @@ public BaseHoodieTableFileIndex(HoodieEngineContext engineContext, boolean shouldIncludePendingCommits, boolean shouldValidateInstant, FileStatusCache fileStatusCache, - boolean shouldListLazily) { + boolean shouldListLazily, + Option beginInstantTime, + Option endInstantTime) { this.partitionColumns = metaClient.getTableConfig().getPartitionFields() .orElse(new String[0]); @@ -143,11 +152,14 @@ public BaseHoodieTableFileIndex(HoodieEngineContext engineContext, && HoodieTableMetadataUtil.isFilesPartitionAvailable(metaClient)) .build(); + this.queryType = queryType; this.queryPaths = queryPaths; this.specifiedQueryInstant = specifiedQueryInstant; this.shouldIncludePendingCommits = shouldIncludePendingCommits; this.shouldValidateInstant = shouldValidateInstant; this.shouldListLazily = shouldListLazily; + this.beginInstantTime = beginInstantTime; + this.endInstantTime = endInstantTime; this.basePath = metaClient.getBasePathV2(); @@ -300,7 +312,17 @@ protected List listPartitionPaths(List relativePartitionP protected List listPartitionPaths(List relativePartitionPaths) { List matchedPartitionPaths; try { - matchedPartitionPaths = tableMetadata.getPartitionPathWithPathPrefixes(relativePartitionPaths); + if (isPartitionedTable()) { + if (queryType == HoodieTableQueryType.INCREMENTAL && beginInstantTime.isPresent()) { + HoodieTimeline timelineAfterBeginInstant = TimelineUtils.getCommitsTimelineAfter(metaClient, beginInstantTime.get(), Option.empty()); + HoodieTimeline timelineToQuery = endInstantTime.map(timelineAfterBeginInstant::findInstantsBeforeOrEquals).orElse(timelineAfterBeginInstant); + matchedPartitionPaths = TimelineUtils.getWrittenPartitions(timelineToQuery); + } else { + matchedPartitionPaths = tableMetadata.getPartitionPathWithPathPrefixes(relativePartitionPaths); + } + } else { + matchedPartitionPaths = Collections.singletonList(StringUtils.EMPTY_STRING); + } } catch (IOException e) { throw new HoodieIOException("Error fetching partition paths", e); } @@ -319,6 +341,10 @@ protected void refresh() { doRefresh(); } + private boolean isPartitionedTable() { + return partitionColumns.length > 0 || HoodieTableMetadata.isMetadataTable(basePath.toString()); + } + protected HoodieTimeline getActiveTimeline() { // NOTE: We have to use commits and compactions timeline, to make sure that we're properly // handling the following case: when records are inserted into the new log-file w/in the file-group diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java index 616e895d221f..1f28e0094f16 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java @@ -81,12 +81,20 @@ public static HoodieTableMetaClient init(String basePath, HoodieTableType tableT } public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, String bootstrapBasePath, boolean bootstrapIndexEnable, String keyGenerator) throws IOException { + return init(basePath, tableType, bootstrapBasePath, bootstrapIndexEnable, keyGenerator, "datestr"); + } + + public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, String bootstrapBasePath, boolean bootstrapIndexEnable, String keyGenerator, + String partitionFieldConfigValue) throws IOException { Properties props = new Properties(); props.setProperty(HoodieTableConfig.BOOTSTRAP_BASE_PATH.key(), bootstrapBasePath); props.put(HoodieTableConfig.BOOTSTRAP_INDEX_ENABLE.key(), bootstrapIndexEnable); if (keyGenerator != null) { props.put("hoodie.datasource.write.keygenerator.class", keyGenerator); - props.put("hoodie.datasource.write.partitionpath.field", "datestr"); + } + if (keyGenerator != null && !keyGenerator.equals("org.apache.hudi.keygen.NonpartitionedKeyGenerator")) { + props.put("hoodie.datasource.write.partitionpath.field", partitionFieldConfigValue); + props.put(HoodieTableConfig.PARTITION_FIELDS.key(), partitionFieldConfigValue); } return init(getDefaultHadoopConf(), basePath, tableType, props); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java index 7cfa624c764c..e8953450d5f0 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java @@ -58,7 +58,9 @@ public HiveHoodieTableFileIndex(HoodieEngineContext engineContext, shouldIncludePendingCommits, true, new NoopCache(), - false); + false, + Option.empty(), + Option.empty()); } /** diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index 580a03dae709..9961e46b5014 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -85,7 +85,9 @@ case class HoodieFileIndex(spark: SparkSession, configProperties = getConfigProperties(spark, options), queryPaths = HoodieFileIndex.getQueryPaths(options), specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant), - fileStatusCache = fileStatusCache + fileStatusCache = fileStatusCache, + beginInstantTime = options.get(DataSourceReadOptions.BEGIN_INSTANTTIME.key), + endInstantTime = options.get(DataSourceReadOptions.END_INSTANTTIME.key) ) with FileIndex { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index 828c8df41deb..f77a6cbe546b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -66,7 +66,9 @@ class SparkHoodieTableFileIndex(spark: SparkSession, configProperties: TypedProperties, queryPaths: Seq[Path], specifiedQueryInstant: Option[String] = None, - @transient fileStatusCache: FileStatusCache = NoopCache) + @transient fileStatusCache: FileStatusCache = NoopCache, + beginInstantTime: Option[String] = None, + endInstantTime: Option[String] = None) extends BaseHoodieTableFileIndex( new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)), metaClient, @@ -77,7 +79,9 @@ class SparkHoodieTableFileIndex(spark: SparkSession, false, false, SparkHoodieTableFileIndex.adapt(fileStatusCache), - shouldListLazily(configProperties) + shouldListLazily(configProperties), + toJavaOption(beginInstantTime), + toJavaOption(endInstantTime) ) with SparkAdapterSupport with Logging { diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java index 57243e971256..ee91b2bcf856 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java @@ -40,6 +40,7 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; @@ -88,6 +89,8 @@ public void setUp() throws IOException { properties.setProperty( PAYLOAD_ORDERING_FIELD_PROP_KEY, HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName()); + properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(),"partition_path"); + properties.setProperty(HoodieTableConfig.PARTITION_FIELDS.key(),"partition_path"); metaClient = getHoodieMetaClient(hadoopConf(), basePath(), HoodieTableType.MERGE_ON_READ, properties); } @@ -173,6 +176,7 @@ public HoodieWriteConfig getWriteConfig(Schema avroSchema) { extraProperties.setProperty( WRITE_RECORD_POSITIONS.key(), "true"); + extraProperties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(),"partition_path"); return getConfigBuilder(true) .withPath(basePath()) @@ -249,7 +253,7 @@ public void checkDataEquality(int numRecords) { .read() .options(properties) .format("org.apache.hudi") - .load(basePath() + "/" + getPartitionPath()); + .load(basePath()); List result = rows.collectAsList(); assertEquals(numRecords, result.size()); } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java index 8df34768909e..254c1768a62b 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java @@ -92,6 +92,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -201,9 +202,9 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec String keyGeneratorClass = partitioned ? SimpleKeyGenerator.class.getCanonicalName() : NonpartitionedKeyGenerator.class.getCanonicalName(); if (deltaCommit) { - metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ, bootstrapBasePath, true, keyGeneratorClass); + metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ, bootstrapBasePath, true, keyGeneratorClass, "partition_path"); } else { - metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE, bootstrapBasePath, true, keyGeneratorClass); + metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE, bootstrapBasePath, true, keyGeneratorClass, "partition_path"); } int totalRecords = 100; @@ -240,7 +241,7 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS); break; } - List partitions = Arrays.asList("2020/04/01", "2020/04/02", "2020/04/03"); + List partitions = partitioned ? Arrays.asList("2020/04/01", "2020/04/02", "2020/04/03") : Collections.EMPTY_LIST; long timestamp = Instant.now().toEpochMilli(); Schema schema = generateNewDataSetAndReturnSchema(timestamp, totalRecords, partitions, bootstrapBasePath); HoodieWriteConfig config = getConfigBuilder(schema.toString())