Skip to content

Conversation

@jonvex
Copy link
Contributor

@jonvex jonvex commented Oct 26, 2025

Describe the issue this Pull Request addresses

This pr #9743 adds more schema evolution functionality and schema processing. However, we used the InternalSchema system to do various operations such as fix null ordering, reorder, and add columns. At the time, InternalSchema only had a single Timestamp type. When converting back to avro, this was assumed to be micros. Therefore, if the schema provider had any millis columns, the processed schema would end up with those columns as micros.

In this pr to update column stats with better support for logical types: #13711, the schema issues were fixed, as well as additional issues with handling and conversion of timestamps during ingestion.

this pr aims to add functionality to spark and hive readers and writers to automatically repair affected tables.
After switching to use the 1.1 binary, the affected columns will undergo evolution from timestamp-micros to timestamp-mills. Normally a lossy evolution that is not supported, this evolution is ok because the data is actually still timestamp-millis it is just mislabeled as micros in the parquet and table schemas

Summary and Changelog

When reading from a hudi table using spark or hive reader if the table schema has a column as millis, but the data schema is micros, we will assume that this column is affected and read it as a millis value instead of a micros value. This correction is also applied to all readers that the default write paths use. As a table is rewritten the parquet files will be correct. A table's latest snapshot can be immediately fixed by writing one commit with the 1.1 binary, and then clustering the entire table.

Impact

Repair affected tables

Risk Level

High,
extensive testing was done and functional tests were added.

Documentation Update

#14100

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

@github-actions github-actions bot added the size:XL PR with lines of changes > 1000 label Oct 26, 2025
@jonvex jonvex mentioned this pull request Oct 26, 2025
3 tasks
@yihua yihua added this to the release-1.1.0 milestone Oct 27, 2025
Comment on lines +1211 to +1213
if (oldSchema.getLogicalType() == null || newSchema.getLogicalType() == null) {
return oldValue;
} else if (oldSchema.getLogicalType() instanceof LogicalTypes.TimestampMillis) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, aditya was getting ingest failures from this

@jonvex jonvex changed the title Timestamp logical final fix(ingest): Fix logical type handling in deltastreamer and repair affected tables Oct 29, 2025
@jonvex jonvex changed the title fix(ingest): Fix logical type handling in deltastreamer and repair affected tables fix(ingest): Repair affected logical timestamp milli tables Oct 29, 2025
// sure that in case the file-schema is not equal to read-schema we'd still
// be able to read that file (in case projection is a proper one)
Configuration hadoopConf = storage.getConf().unwrapCopyAs(Configuration.class);
Schema repairedFileSchema = AvroSchemaRepair.repairLogicalTypes(getSchema(), schema);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets do this only if table schema contains logical ts-millis.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The AvroSchemaRepair.repairLogicalTypes will return the original schema if there is no ts-millis.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll follow up with additional minor perf optimization in a separate PR.

adjustConfToReadWithFileProduceMode(legacyMode, configuration);
ReadContext readContext = super.init(configuration, keyValueMetaData, fileSchema);
MessageType requestedSchema = readContext.getRequestedSchema();
MessageType requestedSchema = SchemaRepair.repairLogicalTypes(readContext.getRequestedSchema(), tableSchema);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The AvroSchemaRepair.repairLogicalTypes will return the original schema if there is no ts-millis.

@yihua yihua force-pushed the timestamp_logical_final branch from 15a5d8d to d23db24 Compare November 6, 2025 22:35
@yihua yihua force-pushed the timestamp_logical_final branch from 4a63d1b to 042caf3 Compare November 7, 2025 23:08
@yihua yihua mentioned this pull request Nov 7, 2025
3 tasks
@nsivabalan nsivabalan force-pushed the timestamp_logical_final branch from ffe8d53 to 4446be8 Compare November 10, 2025 22:54
checkArgument(type != HoodieRecordType.SPARK, "Not support read avro to spark record");
// TODO AvroSparkReader need
RecordIterator iterator = RecordIterator.getInstance(this, content);
RecordIterator iterator = RecordIterator.getInstance(this, content, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this take the config to determine if the timestamp millis field should be repaired?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I thought you already reviewed these changes. and I was just assisting Vamsi and lin on triaging test failures from CI.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like a gap which needs to be fixed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic still guarantees correctness, just that it can incur additional schema comparison which is a minor overhead. We'll address this perf optimization in a separate PR.

@nsivabalan nsivabalan marked this pull request as ready for review November 10, 2025 22:56

if (recordNeedsRewriteForExtendedAvroTypePromotion(writerSchema, readerSchema)) {
this.reader = new GenericDatumReader<>(writerSchema, writerSchema);
Schema repairedWriterSchema = AvroSchemaRepair.repairLogicalTypes(writerSchema, readerSchema);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also check the flag enableLogicalTimestampFieldRepair ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're prioritizing correctness in this PR. The schema repair overhead is minimal. We'll take the minor perf optimization separately.

// the default iterator mode is engine-specific record mode
private IteratorMode iteratorMode = IteratorMode.ENGINE_RECORD;
protected final HoodieConfig hoodieReaderConfig;
private boolean enableLogicalTimestampFieldRepair = true;
Copy link
Contributor

@danny0405 danny0405 Nov 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we by default disable it to avoid schema parse overhead for other engines?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the other comment that we're prioritizing correctness in this PR.

@nsivabalan
Copy link
Contributor

@hudi-bot run azure

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@yihua yihua merged commit fd79a16 into apache:master Nov 11, 2025
70 checks passed
yihua added a commit to yihua/hudi that referenced this pull request Nov 11, 2025
…4161)

Co-authored-by: Jonathan Vexler <=>
Co-authored-by: sivabalan <n.siva.b@gmail.com>
Co-authored-by: Vamsi <vamsi@onehouse.ai>
Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com>
Co-authored-by: Lin Liu <linliu.code@gmail.com>
val requestedAvroSchema = AvroSchemaUtils.pruneDataSchema(avroTableSchema, AvroConversionUtils.convertStructTypeToAvroSchema(requestedSchema, sanitizedTableName), exclusionFields)
val dataAvroSchema = AvroSchemaUtils.pruneDataSchema(avroTableSchema, AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema, sanitizedTableName), exclusionFields)

spark.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", supportVectorizedRead.toString)
Copy link
Member

@TheR1sing3un TheR1sing3un Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A friendly reminder:
If we modify this configuration in the conf of spark sessionState in the hudi logic, it may disrupt the read logic of other datasources.
For example, if this configuration is initially set to true, When a spark sql reads a hudi table and another datasource table such as a hive table, the behavior we hope for is that whether the hudi performs vectorized reading is controlled by the hudi logic itself, while hive directly performs vectorized reading.
However, if we change this configuration here, perhaps this will lead to hive not performing vectorized reading.

cc @jonvex @yihua

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A friendly reminder: If we modify this configuration in the conf of spark sessionState in the hudi logic, it may disrupt the read logic of other datasources. For example, if this configuration is initially set to true, When a spark sql reads a hudi table and another datasource table such as a hive table, the behavior we hope for is that whether the hudi performs vectorized reading is controlled by the hudi logic itself, while hive directly performs vectorized reading. However, if we change this configuration here, perhaps this will lead to hive not performing vectorized reading.

cc @jonvex @yihua

Same problem in BaseFileOnlyRelation.scala: #9129 #10134

Copy link
Contributor

@danny0405 danny0405 Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean the override of the conf in spark session would affect the behavior of federation queries with multiple data sources in one spark workload, like Hive data source together with Hudi?

@yihua it looks like a regression though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

linliu-code added a commit to linliu-code/hudi that referenced this pull request Dec 15, 2025
…4161)

Co-authored-by: Jonathan Vexler <=>
Co-authored-by: sivabalan <n.siva.b@gmail.com>
Co-authored-by: Vamsi <vamsi@onehouse.ai>
Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com>
Co-authored-by: Lin Liu <linliu.code@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

release-1.1.0 size:XL PR with lines of changes > 1000

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants