Skip to content

Commit

Permalink
Add option to fallback to full table scan if files are deleted due to…
Browse files Browse the repository at this point in the history
… cleaner
  • Loading branch information
harsh1231 committed Oct 30, 2023
1 parent 4f723fb commit 2731e3d
Showing 1 changed file with 8 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,15 @@ public class QueryRunner {
private final SparkSession sparkSession;
private final String sourcePath;

private final TypedProperties props;

private static final Logger LOG = LoggerFactory.getLogger(QueryRunner.class);

public QueryRunner(SparkSession sparkSession, TypedProperties props) {
this.sparkSession = sparkSession;
checkRequiredConfigProperties(props, Collections.singletonList(HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH));
this.sourcePath = getStringWithAltKeys(props, HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH);
this.props = props;
}

/**
Expand Down Expand Up @@ -85,7 +88,11 @@ public Dataset<Row> runIncrementalQuery(QueryInfo queryInfo) {
return sparkSession.read().format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE().key(), queryInfo.getQueryType())
.option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), queryInfo.getPreviousInstant())
.option(DataSourceReadOptions.END_INSTANTTIME().key(), queryInfo.getEndInstant()).load(sourcePath);
.option(DataSourceReadOptions.END_INSTANTTIME().key(), queryInfo.getEndInstant())
.option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(),
props.getString(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(),
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().defaultValue()))
.load(sourcePath);
}

public Dataset<Row> runSnapshotQuery(QueryInfo queryInfo) {
Expand Down

0 comments on commit 2731e3d

Please sign in to comment.