Skip to content

Commit

Permalink
[HUDI-7096] Improving incremental query to fetch partitions based on …
Browse files Browse the repository at this point in the history
…commit metadata (apache#10098)
  • Loading branch information
nsivabalan authored Nov 22, 2023
1 parent 2522f6d commit c5af85d
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieLogCompactException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
Expand Down Expand Up @@ -1088,6 +1089,9 @@ public void rollbackFailedBootstrap() {
table.rollbackBootstrap(context, createNewInstantTime());
LOG.info("Finished rolling back pending bootstrap");
}

// if bootstrap failed, lets delete metadata and restart from scratch
HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), context);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
Expand Down Expand Up @@ -82,7 +84,10 @@ public abstract class BaseHoodieTableFileIndex implements AutoCloseable {

protected final HoodieMetadataConfig metadataConfig;

private final HoodieTableQueryType queryType;
private final Option<String> specifiedQueryInstant;
private final Option<String> beginInstantTime;
private final Option<String> endInstantTime;
private final List<Path> queryPaths;

private final boolean shouldIncludePendingCommits;
Expand Down Expand Up @@ -123,6 +128,8 @@ public abstract class BaseHoodieTableFileIndex implements AutoCloseable {
* @param shouldIncludePendingCommits flags whether file-index should exclude any pending operations
* @param shouldValidateInstant flags to validate whether query instant is present in the timeline
* @param fileStatusCache transient cache of fetched [[FileStatus]]es
* @param beginInstantTime begin instant time for incremental query (optional)
* @param endInstantTime end instant time for incremental query (optional)
*/
public BaseHoodieTableFileIndex(HoodieEngineContext engineContext,
HoodieTableMetaClient metaClient,
Expand All @@ -133,7 +140,9 @@ public BaseHoodieTableFileIndex(HoodieEngineContext engineContext,
boolean shouldIncludePendingCommits,
boolean shouldValidateInstant,
FileStatusCache fileStatusCache,
boolean shouldListLazily) {
boolean shouldListLazily,
Option<String> beginInstantTime,
Option<String> endInstantTime) {
this.partitionColumns = metaClient.getTableConfig().getPartitionFields()
.orElse(new String[0]);

Expand All @@ -143,11 +152,14 @@ public BaseHoodieTableFileIndex(HoodieEngineContext engineContext,
&& HoodieTableMetadataUtil.isFilesPartitionAvailable(metaClient))
.build();

this.queryType = queryType;
this.queryPaths = queryPaths;
this.specifiedQueryInstant = specifiedQueryInstant;
this.shouldIncludePendingCommits = shouldIncludePendingCommits;
this.shouldValidateInstant = shouldValidateInstant;
this.shouldListLazily = shouldListLazily;
this.beginInstantTime = beginInstantTime;
this.endInstantTime = endInstantTime;

this.basePath = metaClient.getBasePathV2();

Expand Down Expand Up @@ -300,7 +312,17 @@ protected List<PartitionPath> listPartitionPaths(List<String> relativePartitionP
protected List<PartitionPath> listPartitionPaths(List<String> relativePartitionPaths) {
List<String> matchedPartitionPaths;
try {
matchedPartitionPaths = tableMetadata.getPartitionPathWithPathPrefixes(relativePartitionPaths);
if (isPartitionedTable()) {
if (queryType == HoodieTableQueryType.INCREMENTAL && beginInstantTime.isPresent()) {
HoodieTimeline timelineAfterBeginInstant = TimelineUtils.getCommitsTimelineAfter(metaClient, beginInstantTime.get(), Option.empty());
HoodieTimeline timelineToQuery = endInstantTime.map(timelineAfterBeginInstant::findInstantsBeforeOrEquals).orElse(timelineAfterBeginInstant);
matchedPartitionPaths = TimelineUtils.getWrittenPartitions(timelineToQuery);
} else {
matchedPartitionPaths = tableMetadata.getPartitionPathWithPathPrefixes(relativePartitionPaths);
}
} else {
matchedPartitionPaths = Collections.singletonList(StringUtils.EMPTY_STRING);
}
} catch (IOException e) {
throw new HoodieIOException("Error fetching partition paths", e);
}
Expand All @@ -319,6 +341,10 @@ protected void refresh() {
doRefresh();
}

private boolean isPartitionedTable() {
return partitionColumns.length > 0 || HoodieTableMetadata.isMetadataTable(basePath.toString());
}

protected HoodieTimeline getActiveTimeline() {
// NOTE: We have to use commits and compactions timeline, to make sure that we're properly
// handling the following case: when records are inserted into the new log-file w/in the file-group
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,20 @@ public static HoodieTableMetaClient init(String basePath, HoodieTableType tableT
}

public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, String bootstrapBasePath, boolean bootstrapIndexEnable, String keyGenerator) throws IOException {
return init(basePath, tableType, bootstrapBasePath, bootstrapIndexEnable, keyGenerator, "datestr");
}

public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, String bootstrapBasePath, boolean bootstrapIndexEnable, String keyGenerator,
String partitionFieldConfigValue) throws IOException {
Properties props = new Properties();
props.setProperty(HoodieTableConfig.BOOTSTRAP_BASE_PATH.key(), bootstrapBasePath);
props.put(HoodieTableConfig.BOOTSTRAP_INDEX_ENABLE.key(), bootstrapIndexEnable);
if (keyGenerator != null) {
props.put("hoodie.datasource.write.keygenerator.class", keyGenerator);
props.put("hoodie.datasource.write.partitionpath.field", "datestr");
}
if (keyGenerator != null && !keyGenerator.equals("org.apache.hudi.keygen.NonpartitionedKeyGenerator")) {
props.put("hoodie.datasource.write.partitionpath.field", partitionFieldConfigValue);
props.put(HoodieTableConfig.PARTITION_FIELDS.key(), partitionFieldConfigValue);
}
return init(getDefaultHadoopConf(), basePath, tableType, props);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ public HiveHoodieTableFileIndex(HoodieEngineContext engineContext,
shouldIncludePendingCommits,
true,
new NoopCache(),
false);
false,
Option.empty(),
Option.empty());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ case class HoodieFileIndex(spark: SparkSession,
configProperties = getConfigProperties(spark, options),
queryPaths = HoodieFileIndex.getQueryPaths(options),
specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant),
fileStatusCache = fileStatusCache
fileStatusCache = fileStatusCache,
beginInstantTime = options.get(DataSourceReadOptions.BEGIN_INSTANTTIME.key),
endInstantTime = options.get(DataSourceReadOptions.END_INSTANTTIME.key)
)
with FileIndex {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
configProperties: TypedProperties,
queryPaths: Seq[Path],
specifiedQueryInstant: Option[String] = None,
@transient fileStatusCache: FileStatusCache = NoopCache)
@transient fileStatusCache: FileStatusCache = NoopCache,
beginInstantTime: Option[String] = None,
endInstantTime: Option[String] = None)
extends BaseHoodieTableFileIndex(
new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)),
metaClient,
Expand All @@ -77,7 +79,9 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
false,
false,
SparkHoodieTableFileIndex.adapt(fileStatusCache),
shouldListLazily(configProperties)
shouldListLazily(configProperties),
toJavaOption(beginInstantTime),
toJavaOption(endInstantTime)
)
with SparkAdapterSupport
with Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
Expand Down Expand Up @@ -88,6 +89,8 @@ public void setUp() throws IOException {
properties.setProperty(
PAYLOAD_ORDERING_FIELD_PROP_KEY,
HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName());
properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(),"partition_path");
properties.setProperty(HoodieTableConfig.PARTITION_FIELDS.key(),"partition_path");
metaClient = getHoodieMetaClient(hadoopConf(), basePath(), HoodieTableType.MERGE_ON_READ, properties);
}

Expand Down Expand Up @@ -173,6 +176,7 @@ public HoodieWriteConfig getWriteConfig(Schema avroSchema) {
extraProperties.setProperty(
WRITE_RECORD_POSITIONS.key(),
"true");
extraProperties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(),"partition_path");

return getConfigBuilder(true)
.withPath(basePath())
Expand Down Expand Up @@ -249,7 +253,7 @@ public void checkDataEquality(int numRecords) {
.read()
.options(properties)
.format("org.apache.hudi")
.load(basePath() + "/" + getPartitionPath());
.load(basePath());
List<Row> result = rows.collectAsList();
assertEquals(numRecords, result.size());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -201,9 +202,9 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec
String keyGeneratorClass = partitioned ? SimpleKeyGenerator.class.getCanonicalName()
: NonpartitionedKeyGenerator.class.getCanonicalName();
if (deltaCommit) {
metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ, bootstrapBasePath, true, keyGeneratorClass);
metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ, bootstrapBasePath, true, keyGeneratorClass, "partition_path");
} else {
metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE, bootstrapBasePath, true, keyGeneratorClass);
metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE, bootstrapBasePath, true, keyGeneratorClass, "partition_path");
}

int totalRecords = 100;
Expand Down Expand Up @@ -240,7 +241,7 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec
HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS);
break;
}
List<String> partitions = Arrays.asList("2020/04/01", "2020/04/02", "2020/04/03");
List<String> partitions = partitioned ? Arrays.asList("2020/04/01", "2020/04/02", "2020/04/03") : Collections.EMPTY_LIST;
long timestamp = Instant.now().toEpochMilli();
Schema schema = generateNewDataSetAndReturnSchema(timestamp, totalRecords, partitions, bootstrapBasePath);
HoodieWriteConfig config = getConfigBuilder(schema.toString())
Expand Down

0 comments on commit c5af85d

Please sign in to comment.