Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-7084] Fixing schema retrieval for table w/ no commits #10069

Merged
merged 2 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -837,8 +837,12 @@ private void validateSchema() throws HoodieUpsertException, HoodieInsertExceptio

try {
TableSchemaResolver schemaResolver = new TableSchemaResolver(getMetaClient());
Option<Schema> existingTableSchema = schemaResolver.getTableAvroSchemaIfPresent(false);
if (!existingTableSchema.isPresent()) {
return;
}
Schema writerSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema());
Schema tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaResolver.getTableAvroSchema(false));
Schema tableSchema = HoodieAvroUtils.createHoodieWriteSchema(existingTableSchema.get());
AvroSchemaUtils.checkSchemaCompatible(tableSchema, writerSchema, shouldValidate, allowProjection, getDropPartitionColNames());
} catch (Exception e) {
throw new HoodieException("Failed to read schema/check compatibility for base path " + metaClient.getBasePath(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

import static org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchema;
import static org.apache.hudi.avro.AvroSchemaUtils.containsFieldInSchema;
Expand Down Expand Up @@ -113,8 +114,12 @@ public TableSchemaResolver(HoodieTableMetaClient metaClient) {
this.hasOperationField = Lazy.lazily(this::hasOperationField);
}

public Schema getTableAvroSchemaFromDataFile() {
return convertParquetSchemaToAvro(getTableParquetSchemaFromDataFile());
public Schema getTableAvroSchemaFromDataFile() throws Exception {
return getTableAvroSchemaFromDataFileInternal().orElseThrow(schemaNotFoundError());
}

private Option<Schema> getTableAvroSchemaFromDataFileInternal() {
return getTableParquetSchemaFromDataFile().map(this::convertParquetSchemaToAvro);
}

/**
Expand All @@ -135,7 +140,7 @@ public Schema getTableAvroSchema() throws Exception {
* @throws Exception
*/
public Schema getTableAvroSchema(boolean includeMetadataFields) throws Exception {
return getTableAvroSchemaInternal(includeMetadataFields, Option.empty());
return getTableAvroSchemaInternal(includeMetadataFields, Option.empty()).orElseThrow(schemaNotFoundError());
}

/**
Expand All @@ -148,7 +153,8 @@ public Schema getTableAvroSchema(String timestamp) throws Exception {
.filterCompletedInstants()
.findInstantsBeforeOrEquals(timestamp)
.lastInstant();
return getTableAvroSchemaInternal(metaClient.getTableConfig().populateMetaFields(), instant);
return getTableAvroSchemaInternal(metaClient.getTableConfig().populateMetaFields(), instant)
.orElseThrow(schemaNotFoundError());
}

/**
Expand All @@ -157,7 +163,7 @@ public Schema getTableAvroSchema(String timestamp) throws Exception {
* @param instant as of which table's schema will be fetched
*/
public Schema getTableAvroSchema(HoodieInstant instant, boolean includeMetadataFields) throws Exception {
return getTableAvroSchemaInternal(includeMetadataFields, Option.of(instant));
return getTableAvroSchemaInternal(includeMetadataFields, Option.of(instant)).orElseThrow(schemaNotFoundError());
}

/**
Expand Down Expand Up @@ -188,11 +194,15 @@ public MessageType getTableParquetSchema(boolean includeMetadataField) throws Ex
*/
@Deprecated
public Schema getTableAvroSchemaWithoutMetadataFields() throws Exception {
return getTableAvroSchema(false);
return getTableAvroSchemaInternal(false, Option.empty()).orElseThrow(schemaNotFoundError());
}

public Option<Schema> getTableAvroSchemaIfPresent(boolean includeMetadataFields) {
return getTableAvroSchemaInternal(includeMetadataFields, Option.empty());
}

private Schema getTableAvroSchemaInternal(boolean includeMetadataFields, Option<HoodieInstant> instantOpt) {
Schema schema =
private Option<Schema> getTableAvroSchemaInternal(boolean includeMetadataFields, Option<HoodieInstant> instantOpt) {
Option<Schema> schema =
(instantOpt.isPresent()
? getTableSchemaFromCommitMetadata(instantOpt.get(), includeMetadataFields)
: getTableSchemaFromLatestCommitMetadata(includeMetadataFields))
Expand All @@ -203,18 +213,18 @@ private Schema getTableAvroSchemaInternal(boolean includeMetadataFields, Option<
? HoodieAvroUtils.addMetadataFields(tableSchema, hasOperationField.get())
: tableSchema)
)
.orElseGet(() -> {
Schema schemaFromDataFile = getTableAvroSchemaFromDataFile();
.or(() -> {
Option<Schema> schemaFromDataFile = getTableAvroSchemaFromDataFileInternal();
return includeMetadataFields
? schemaFromDataFile
: HoodieAvroUtils.removeMetadataFields(schemaFromDataFile);
: schemaFromDataFile.map(HoodieAvroUtils::removeMetadataFields);
});

// TODO partition columns have to be appended in all read-paths
if (metaClient.getTableConfig().shouldDropPartitionColumns()) {
if (metaClient.getTableConfig().shouldDropPartitionColumns() && schema.isPresent()) {
return metaClient.getTableConfig().getPartitionFields()
.map(partitionFields -> appendPartitionColumns(schema, Option.ofNullable(partitionFields)))
.orElse(schema);
.map(partitionFields -> appendPartitionColumns(schema.get(), Option.ofNullable(partitionFields)))
.or(() -> schema);
}

return schema;
Expand Down Expand Up @@ -257,7 +267,7 @@ private Option<Schema> getTableSchemaFromCommitMetadata(HoodieInstant instant, b
/**
* Fetches the schema for a table from any the table's data files
*/
private MessageType getTableParquetSchemaFromDataFile() {
private Option<MessageType> getTableParquetSchemaFromDataFile() {
Option<Pair<HoodieInstant, HoodieCommitMetadata>> instantAndCommitMetadata = getLatestCommitMetadataWithValidData();
try {
switch (metaClient.getTableType()) {
Expand All @@ -270,10 +280,11 @@ private MessageType getTableParquetSchemaFromDataFile() {
if (instantAndCommitMetadata.isPresent()) {
HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight();
Iterator<String> filePaths = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePathV2()).values().iterator();
return fetchSchemaFromFiles(filePaths);
return Option.of(fetchSchemaFromFiles(filePaths));
} else {
throw new IllegalArgumentException("Could not find any data file written for commit, "
LOG.warn("Could not find any data file written for commit, "
codope marked this conversation as resolved.
Show resolved Hide resolved
+ "so could not get schema for table " + metaClient.getBasePath());
return Option.empty();
}
default:
LOG.error("Unknown table type " + metaClient.getTableType());
Expand Down Expand Up @@ -308,7 +319,7 @@ private MessageType convertAvroSchemaToParquet(Schema schema) {
*/
public Option<Schema> getTableAvroSchemaFromLatestCommit(boolean includeMetadataFields) throws Exception {
if (metaClient.isTimelineNonEmpty()) {
return Option.of(getTableAvroSchemaInternal(includeMetadataFields, Option.empty()));
return getTableAvroSchemaInternal(includeMetadataFields, Option.empty());
}

return Option.empty();
Expand Down Expand Up @@ -569,4 +580,8 @@ public static Schema appendPartitionColumns(Schema dataSchema, Option<String[]>

return dataSchema;
}

private Supplier<Exception> schemaNotFoundError() {
return () -> new IllegalArgumentException("No schema found for table at " + metaClient.getBasePathV2().toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public static void setPreCombineField(Configuration conf, HoodieTableMetaClient
* @param conf The configuration
* @param metaClient The meta client
*/
public static void inferChangelogMode(Configuration conf, HoodieTableMetaClient metaClient) {
public static void inferChangelogMode(Configuration conf, HoodieTableMetaClient metaClient) throws Exception {
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
Schema tableAvroSchema = tableSchemaResolver.getTableAvroSchemaFromDataFile();
if (tableAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD) != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1141,10 +1141,11 @@ private Schema getSchemaForWriteConfig(Schema targetSchema) {
.build();
int totalCompleted = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants();
if (totalCompleted > 0) {
try {
TableSchemaResolver schemaResolver = new TableSchemaResolver(meta);
newWriteSchema = schemaResolver.getTableAvroSchema(false);
} catch (IllegalArgumentException e) {
TableSchemaResolver schemaResolver = new TableSchemaResolver(meta);
Option<Schema> tableSchema = schemaResolver.getTableAvroSchemaIfPresent(false);
if (tableSchema.isPresent()) {
newWriteSchema = tableSchema.get();
} else {
LOG.warn("Could not fetch schema from table. Falling back to using target schema from schema provider");
}
}
Expand Down
Loading