Skip to content

Commit

Permalink
Add option to fallback to full table scan if files are deleted due to…
Browse files Browse the repository at this point in the history
… cleaner
  • Loading branch information
harsh1231 committed Oct 30, 2023
1 parent 4f723fb commit bd1873d
Show file tree
Hide file tree
Showing 8 changed files with 19 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ object DataSourceReadOptions {
" by carefully analyzing provided partition-column predicates and deducing corresponding partition-path prefix from " +
" them (if possible).")

val INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES: ConfigProperty[String] = ConfigProperty
val INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.read.incr.fallback.fulltablescan.enable")
.defaultValue("false")
.markAdvanced()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ class IncrementalRelation(val sqlContext: SQLContext,
// 1. the start commit is archived
// 2. the end commit is archived
// 3. there are files in metadata be deleted
val fallbackToFullTableScan = optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key,
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.defaultValue).toBoolean
val fallbackToFullTableScan = optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN.key,
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN.defaultValue).toBoolean

val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ trait HoodieIncrementalRelationTrait extends HoodieBaseRelation {
// 2. the end commit is archived
// 3. there are files in metadata be deleted
protected lazy val fullTableScan: Boolean = {
val fallbackToFullTableScan = optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key,
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.defaultValue).toBoolean
val fallbackToFullTableScan = optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN.key,
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN.defaultValue).toBoolean

fallbackToFullTableScan && (startInstantArchived || endInstantArchived || affectedFilesInCommits.exists(fileStatus => !metaClient.getFs.exists(fileStatus.getPath)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ class TestColumnStatsIndexWithSQL extends ColumnStatIndexTestBase {

// verify incremental query
verifySQLQueries(numRecordsForFirstQuery, numRecordsForSecondQuery, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, commonOpts, isTableDataSameAsAfterSecondInstant)
commonOpts = commonOpts + (DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key -> "true")
commonOpts = commonOpts + (DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN.key -> "true")
// TODO: https://issues.apache.org/jira/browse/HUDI-6657 - Investigate why below assertions fail with full table scan enabled.
//verifySQLQueries(numRecordsForFirstQuery, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, commonOpts, isTableDataSameAsAfterSecondInstant)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class TestIncrementalReadWithFullTableScan extends HoodieSparkClientTestBase {
.option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs)
.option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs)
.option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key(), fallBackFullTableScan)
.option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN.key(), fallBackFullTableScan)
.load(basePath)
assertEquals(perBatchSize * batchNum, hoodieIncViewDF.count())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

import static org.apache.hudi.DataSourceReadOptions.BEGIN_INSTANTTIME;
import static org.apache.hudi.DataSourceReadOptions.END_INSTANTTIME;
import static org.apache.hudi.DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES;
import static org.apache.hudi.DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN;
import static org.apache.hudi.DataSourceReadOptions.INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT;
import static org.apache.hudi.DataSourceReadOptions.QUERY_TYPE;
import static org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL;
Expand Down Expand Up @@ -184,9 +184,9 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
.option(QUERY_TYPE().key(), QUERY_TYPE_INCREMENTAL_OPT_VAL())
.option(BEGIN_INSTANTTIME().key(), queryInfo.getStartInstant())
.option(END_INSTANTTIME().key(), queryInfo.getEndInstant())
.option(INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(),
props.getString(INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(),
INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().defaultValue()))
.option(INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(),
props.getString(INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(),
INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().defaultValue()))
.option(INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT().key(), handlingMode.name())
.load(srcPath);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
*/
public class QueryRunner {
private final SparkSession sparkSession;
private final TypedProperties props;
private final String sourcePath;

private static final Logger LOG = LoggerFactory.getLogger(QueryRunner.class);
Expand All @@ -52,6 +53,7 @@ public QueryRunner(SparkSession sparkSession, TypedProperties props) {
this.sparkSession = sparkSession;
checkRequiredConfigProperties(props, Collections.singletonList(HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH));
this.sourcePath = getStringWithAltKeys(props, HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH);
this.props = props;
}

/**
Expand Down Expand Up @@ -85,7 +87,11 @@ public Dataset<Row> runIncrementalQuery(QueryInfo queryInfo) {
return sparkSession.read().format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE().key(), queryInfo.getQueryType())
.option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), queryInfo.getPreviousInstant())
.option(DataSourceReadOptions.END_INSTANTTIME().key(), queryInfo.getEndInstant()).load(sourcePath);
.option(DataSourceReadOptions.END_INSTANTTIME().key(), queryInfo.getEndInstant())
.option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(),
props.getString(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(),
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().defaultValue()))
.load(sourcePath);
}

public Dataset<Row> runSnapshotQuery(QueryInfo queryInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2212,7 +2212,7 @@ public void testHoodieIncrFallback() throws Exception {

// Remove source.hoodieincr.num_instants config
downstreamCfg.configs.remove(downstreamCfg.configs.size() - 1);
downstreamCfg.configs.add(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key() + "=true");
downstreamCfg.configs.add(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key() + "=true");
//Adding this conf to make testing easier :)
downstreamCfg.configs.add("hoodie.deltastreamer.source.hoodieincr.num_instants=10");
downstreamCfg.operation = WriteOperationType.UPSERT;
Expand Down

0 comments on commit bd1873d

Please sign in to comment.