-
Notifications
You must be signed in to change notification settings - Fork 2.5k
fix(ingest): Repair affected logical timestamp milli tables #14161
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
...t/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
Outdated
Show resolved
Hide resolved
...ent/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
Outdated
Show resolved
Hide resolved
...ent/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
Outdated
Show resolved
Hide resolved
...ent/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
Outdated
Show resolved
Hide resolved
...ent/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
Outdated
Show resolved
Hide resolved
...ent/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
Outdated
Show resolved
Hide resolved
...ent/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
Show resolved
Hide resolved
...ent/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
Show resolved
Hide resolved
...di-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
Show resolved
Hide resolved
...main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetReadSupport.scala
Outdated
Show resolved
Hide resolved
| if (oldSchema.getLogicalType() == null || newSchema.getLogicalType() == null) { | ||
| return oldValue; | ||
| } else if (oldSchema.getLogicalType() instanceof LogicalTypes.TimestampMillis) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this change needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, aditya was getting ingest failures from this
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
Show resolved
Hide resolved
hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/parquet/schema/AvroSchemaRepair.java
Outdated
Show resolved
Hide resolved
hudi-hadoop-common/src/main/java/org/apache/parquet/avro/HoodieAvroReadSupport.java
Show resolved
Hide resolved
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
Outdated
Show resolved
Hide resolved
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieAvroParquetReader.java
Show resolved
Hide resolved
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieAvroParquetReader.java
Show resolved
Hide resolved
...ommon/src/main/scala/org/apache/spark/sql/execution/datasources/orc/SparkOrcReaderBase.scala
Show resolved
Hide resolved
...rg/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
Outdated
Show resolved
Hide resolved
...rg/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
Outdated
Show resolved
Hide resolved
hudi-utilities/src/test/resources/logical-repair/mor_write_updates/5/data.json
Show resolved
Hide resolved
hudi-utilities/src/test/resources/logical-repair/cow_write_updates/2/data.json
Show resolved
Hide resolved
hudi-utilities/src/test/resources/logical-repair/trips_logical_types_json_cow_write.zip
Show resolved
Hide resolved
...src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40ParquetReader.scala
Outdated
Show resolved
Hide resolved
...src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35ParquetReader.scala
Show resolved
Hide resolved
...src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34ParquetReader.scala
Outdated
Show resolved
Hide resolved
...src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala
Outdated
Show resolved
Hide resolved
...park-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
Outdated
Show resolved
Hide resolved
...park-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
Outdated
Show resolved
Hide resolved
...park-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
Outdated
Show resolved
Hide resolved
...park-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
Show resolved
Hide resolved
...ent/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
Show resolved
Hide resolved
...ent/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/parquet/schema/AvroSchemaRepair.java
Show resolved
Hide resolved
...main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetReadSupport.scala
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
Outdated
Show resolved
Hide resolved
| // sure that in case the file-schema is not equal to read-schema we'd still | ||
| // be able to read that file (in case projection is a proper one) | ||
| Configuration hadoopConf = storage.getConf().unwrapCopyAs(Configuration.class); | ||
| Schema repairedFileSchema = AvroSchemaRepair.repairLogicalTypes(getSchema(), schema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets do this only if table schema contains logical ts-millis.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The AvroSchemaRepair.repairLogicalTypes will return the original schema if there is no ts-millis.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll follow up with additional minor perf optimization in a separate PR.
| adjustConfToReadWithFileProduceMode(legacyMode, configuration); | ||
| ReadContext readContext = super.init(configuration, keyValueMetaData, fileSchema); | ||
| MessageType requestedSchema = readContext.getRequestedSchema(); | ||
| MessageType requestedSchema = SchemaRepair.repairLogicalTypes(readContext.getRequestedSchema(), tableSchema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment as above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The AvroSchemaRepair.repairLogicalTypes will return the original schema if there is no ts-millis.
...park-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
Show resolved
Hide resolved
...src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35ParquetReader.scala
Show resolved
Hide resolved
15a5d8d to
d23db24
Compare
...rg/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
Outdated
Show resolved
Hide resolved
4a63d1b to
042caf3
Compare
...di-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
Show resolved
Hide resolved
ffe8d53 to
4446be8
Compare
| checkArgument(type != HoodieRecordType.SPARK, "Not support read avro to spark record"); | ||
| // TODO AvroSparkReader need | ||
| RecordIterator iterator = RecordIterator.getInstance(this, content); | ||
| RecordIterator iterator = RecordIterator.getInstance(this, content, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this take the config to determine if the timestamp millis field should be repaired?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh I thought you already reviewed these changes. and I was just assisting Vamsi and lin on triaging test failures from CI.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like a gap which needs to be fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic still guarantees correctness, just that it can incur additional schema comparison which is a minor overhead. We'll address this perf optimization in a separate PR.
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/parquet/schema/AvroSchemaRepair.java
Outdated
Show resolved
Hide resolved
|
|
||
| if (recordNeedsRewriteForExtendedAvroTypePromotion(writerSchema, readerSchema)) { | ||
| this.reader = new GenericDatumReader<>(writerSchema, writerSchema); | ||
| Schema repairedWriterSchema = AvroSchemaRepair.repairLogicalTypes(writerSchema, readerSchema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we also check the flag enableLogicalTimestampFieldRepair ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're prioritizing correctness in this PR. The schema repair overhead is minimal. We'll take the minor perf optimization separately.
...rg/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala
Show resolved
Hide resolved
| // the default iterator mode is engine-specific record mode | ||
| private IteratorMode iteratorMode = IteratorMode.ENGINE_RECORD; | ||
| protected final HoodieConfig hoodieReaderConfig; | ||
| private boolean enableLogicalTimestampFieldRepair = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we by default disable it to avoid schema parse overhead for other engines?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as the other comment that we're prioritizing correctness in this PR.
...utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
Outdated
Show resolved
Hide resolved
...utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
Outdated
Show resolved
Hide resolved
...park-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
Outdated
Show resolved
Hide resolved
hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
Outdated
Show resolved
Hide resolved
...atasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
Outdated
Show resolved
Hide resolved
...ain/scala/org/apache/spark/sql/hudi/command/procedures/ShowColumnStatsOverlapProcedure.scala
Outdated
Show resolved
Hide resolved
|
@hudi-bot run azure |
…4161) Co-authored-by: Jonathan Vexler <=> Co-authored-by: sivabalan <n.siva.b@gmail.com> Co-authored-by: Vamsi <vamsi@onehouse.ai> Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com> Co-authored-by: Lin Liu <linliu.code@gmail.com>
| val requestedAvroSchema = AvroSchemaUtils.pruneDataSchema(avroTableSchema, AvroConversionUtils.convertStructTypeToAvroSchema(requestedSchema, sanitizedTableName), exclusionFields) | ||
| val dataAvroSchema = AvroSchemaUtils.pruneDataSchema(avroTableSchema, AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema, sanitizedTableName), exclusionFields) | ||
|
|
||
| spark.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", supportVectorizedRead.toString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A friendly reminder:
If we modify this configuration in the conf of spark sessionState in the hudi logic, it may disrupt the read logic of other datasources.
For example, if this configuration is initially set to true, When a spark sql reads a hudi table and another datasource table such as a hive table, the behavior we hope for is that whether the hudi performs vectorized reading is controlled by the hudi logic itself, while hive directly performs vectorized reading.
However, if we change this configuration here, perhaps this will lead to hive not performing vectorized reading.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A friendly reminder: If we modify this configuration in the conf of spark sessionState in the hudi logic, it may disrupt the read logic of other datasources. For example, if this configuration is initially set to true, When a spark sql reads a hudi table and another datasource table such as a hive table, the behavior we hope for is that whether the hudi performs vectorized reading is controlled by the hudi logic itself, while hive directly performs vectorized reading. However, if we change this configuration here, perhaps this will lead to hive not performing vectorized reading.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you mean the override of the conf in spark session would affect the behavior of federation queries with multiple data sources in one spark workload, like Hive data source together with Hudi?
@yihua it looks like a regression though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
…4161) Co-authored-by: Jonathan Vexler <=> Co-authored-by: sivabalan <n.siva.b@gmail.com> Co-authored-by: Vamsi <vamsi@onehouse.ai> Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com> Co-authored-by: Lin Liu <linliu.code@gmail.com>
Describe the issue this Pull Request addresses
This pr #9743 adds more schema evolution functionality and schema processing. However, we used the InternalSchema system to do various operations such as fix null ordering, reorder, and add columns. At the time, InternalSchema only had a single Timestamp type. When converting back to avro, this was assumed to be micros. Therefore, if the schema provider had any millis columns, the processed schema would end up with those columns as micros.
In this pr to update column stats with better support for logical types: #13711, the schema issues were fixed, as well as additional issues with handling and conversion of timestamps during ingestion.
this pr aims to add functionality to spark and hive readers and writers to automatically repair affected tables.
After switching to use the 1.1 binary, the affected columns will undergo evolution from timestamp-micros to timestamp-mills. Normally a lossy evolution that is not supported, this evolution is ok because the data is actually still timestamp-millis it is just mislabeled as micros in the parquet and table schemas
Summary and Changelog
When reading from a hudi table using spark or hive reader if the table schema has a column as millis, but the data schema is micros, we will assume that this column is affected and read it as a millis value instead of a micros value. This correction is also applied to all readers that the default write paths use. As a table is rewritten the parquet files will be correct. A table's latest snapshot can be immediately fixed by writing one commit with the 1.1 binary, and then clustering the entire table.
Impact
Repair affected tables
Risk Level
High,
extensive testing was done and functional tests were added.
Documentation Update
#14100
Contributor's checklist