Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
84 commits
Select commit Hold shift + click to select a range
cf502e3
current progress
Oct 16, 2025
c5e6421
seems to be working for spark non vectorized and avro
Oct 17, 2025
1f28581
filters working
Oct 17, 2025
19fdf52
prevent overflow
Oct 17, 2025
8a8194f
use read support instead of mapping function
Oct 20, 2025
a957db1
use repaired schema instead of doing operations after we read the data
Oct 20, 2025
9e660fb
add spark log support
Oct 20, 2025
290f90f
remove find cols to multiply class
Oct 20, 2025
4f6277f
log file changes as requested and set up the read supports for spark …
Oct 21, 2025
acf3ac9
hive working
Oct 21, 2025
2cb2fda
add individual test and fix issue with dropping messagetype logical
Oct 22, 2025
e325e68
revert calls for rewrite avro with extra param
Oct 22, 2025
a9a3958
revert config to prevent timestamp evolutions
Oct 22, 2025
22c2cd9
A few fixes
Oct 22, 2025
1552ff9
fix bug with field reuse in avro schema repair
Oct 22, 2025
1e68dd2
fix read parquet log block timestamp ntz
Oct 22, 2025
8e7454f
allow long to timestampntz without cast
Oct 22, 2025
aff83fa
refactor AvroSchemaRepair for performance and add unit tests
Oct 23, 2025
c82f3bc
refactor schema repair for performance and add testing
Oct 23, 2025
1a95e25
try fix other spark versions
Oct 23, 2025
3dc2aac
fix spark 3.3 build
Oct 24, 2025
1e9b7e4
fix spark3.4 build
Oct 24, 2025
b99c1a0
hopefully fix spark4
Oct 24, 2025
a4d08ef
fix issue with union schema, add table schema to cdc in missing place
Oct 24, 2025
d53c4e5
add spark cow read testing for repair
Oct 24, 2025
77b336d
building, and add spark mor tests
Oct 24, 2025
f8bdd66
forgot to add the zips
Oct 24, 2025
47afa1d
cow write testing
Oct 25, 2025
11f08d5
add mor testing
Oct 25, 2025
bc46e51
disable tests for spark 3.3
Oct 26, 2025
61eabe6
fix for spark 3.4
Oct 26, 2025
9b37bd4
fix spark33 for real
Oct 26, 2025
47ce682
remove fg reader test
Oct 26, 2025
2ce59e1
remove unneeded avro utils change
Oct 26, 2025
7009727
fix spark 4
Oct 27, 2025
f638cd8
fix timestamps in deltastreamer test
Oct 27, 2025
045130f
fix failing test
Oct 27, 2025
c05a361
vectorized fallback for 3.3 and 3.4
Oct 28, 2025
459cba7
fix vectorized fallback
Oct 28, 2025
c3cab96
add testing, and also fallback for local timestamp millis
Oct 28, 2025
a2b609f
add tests from java-parquet
Oct 28, 2025
536baf1
replace import with hardcode
Oct 28, 2025
2f9f582
fix long import names
Oct 29, 2025
8ed13d0
address most review comments
Oct 29, 2025
e9c2540
Fixing spark3.3 reads
nsivabalan Nov 5, 2025
206fab4
minor renames
nsivabalan Nov 5, 2025
9e06e70
Adding java docs
nsivabalan Nov 5, 2025
43661f7
java docs
nsivabalan Nov 5, 2025
fefa7d8
refactor repairFooterSchema to a common module
Nov 5, 2025
20c217f
rename resolveNullableSchema method
Nov 5, 2025
dfaf9df
add check for no compactions and clustering on test table
Nov 5, 2025
36b1c60
Add checks for enableLogicalTimestampFieldRepair
Nov 5, 2025
f857017
Revert pom changes
yihua Nov 6, 2025
3a07c44
Revert "refactor repairFooterSchema to a common module"
yihua Nov 6, 2025
6422a6e
Fix scalastyle
yihua Nov 6, 2025
cd0bca5
Fix build on Spark 3.3 and 3.4
yihua Nov 6, 2025
50ebfd4
Fix TestHoodieDeltaStreamer
yihua Nov 6, 2025
294337d
Fix licences
yihua Nov 6, 2025
f3b52ae
Fix Spark 4
yihua Nov 7, 2025
9ce9c57
Fix Spark 4 and renames
yihua Nov 7, 2025
5fe65e6
Fix Spark40LegacyHoodieParquetFileFormat
yihua Nov 7, 2025
1171713
Add flag for repairing loogical timestamp in avro log file reader
Nov 7, 2025
7304a96
Fix checkstyle
yihua Nov 7, 2025
0ad14ab
Fix bug in config passing
yihua Nov 7, 2025
38ba9b3
Disable TestAvroSchemaConverter to validate other tests
yihua Nov 7, 2025
152d849
Update testNestedTypeVectorizedReadWithTypeChange test behavior
yihua Nov 8, 2025
8ebc4d5
Another fix
yihua Nov 8, 2025
5c4d16d
Fix the testLogicalTypesReadRepair
linliu-code Nov 8, 2025
4034a51
Make schema fetching and parsing lazy for adjusting column and partit…
yihua Nov 8, 2025
37ab24c
Fix testReadIndexDefFromStorage
yihua Nov 8, 2025
3df5906
Enhance test
yihua Nov 8, 2025
efbfa7c
Disable failed tests for investigation
yihua Nov 8, 2025
6db7a16
Fix test logic
yihua Nov 8, 2025
fddab79
Fix naming of configs
yihua Nov 10, 2025
5e8bb52
fix col stats test failures
Nov 10, 2025
4446be8
Addressing Siva's feedback on fixing col stats for v1 index defn
nsivabalan Nov 10, 2025
9f96890
Fixing tests in TestHoodieTableMetadataUtil
nsivabalan Nov 11, 2025
a04d3ca
Fix tests to follow out-of-the-box behavior
yihua Nov 11, 2025
08f332a
Fix testReadIndexDefFromStorage
yihua Nov 11, 2025
886029a
Remove test comment
yihua Nov 11, 2025
3da1946
Fix typo
yihua Nov 11, 2025
9b1c057
Fix ShowColumnStatsOverlapProcedure
yihua Nov 11, 2025
d57cede
Fix AvroSchemaRepair for Avro compatibility
yihua Nov 11, 2025
ea6eca9
Skip timestamp millis column for initial table version < 9
linliu-code Nov 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
import static org.apache.hudi.avro.AvroSchemaUtils.getNonNullTypeFromUnion;
import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER;
import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.LAZY;
import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS;
Expand Down Expand Up @@ -1019,7 +1019,7 @@ static void validateSecondaryIndexSchemaEvolution(

if (writerField != null && !tableField.schema().equals(writerField.schema())) {
// Check if this is just making the field nullable/non-nullable, which is safe from SI perspective
if (resolveNullableSchema(tableField.schema()).equals(resolveNullableSchema(writerField.schema()))) {
if (getNonNullTypeFromUnion(tableField.schema()).equals(getNonNullTypeFromUnion(writerField.schema()))) {
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public StorageConfiguration<?> getStorageConf() {

@Override
public HoodieReaderContext<ArrayWritable> getHoodieReaderContext(String tablePath, Schema avroSchema, StorageConfiguration<?> storageConf, HoodieTableMetaClient metaClient) {
HoodieFileGroupReaderBasedRecordReader.HiveReaderCreator readerCreator = (inputSplit, jobConf) -> new MapredParquetInputFormat().getRecordReader(inputSplit, jobConf, null);
HoodieFileGroupReaderBasedRecordReader.HiveReaderCreator readerCreator = (inputSplit, jobConf, dataSchema) -> new MapredParquetInputFormat().getRecordReader(inputSplit, jobConf, null);
JobConf jobConf = new JobConf(storageConf.unwrapAs(Configuration.class));
setupJobconf(jobConf, avroSchema);
return new HiveHoodieReaderContext(readerCreator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.avro.Schema;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.BytesWritable;
Expand Down Expand Up @@ -268,7 +269,11 @@ private static void assertWritablePrimaryTypeMatchesSchema(Schema schema, Writab
break;

case LONG:
assertInstanceOf(LongWritable.class, writable);
if (schema.getLogicalType() instanceof LogicalTypes.TimestampMillis) {
assertInstanceOf(TimestampWritable.class, writable);
} else {
assertInstanceOf(LongWritable.class, writable);
}
break;

case FLOAT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.io.storage;

import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.model.HoodieFileFormat;
Expand All @@ -40,6 +41,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.SchemaRepair;
import org.apache.spark.sql.HoodieInternalRowUtils;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
Expand All @@ -60,13 +62,16 @@

import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
import static org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS;
import static org.apache.parquet.avro.HoodieAvroParquetSchemaConverter.getAvroSchemaConverter;

public class HoodieSparkParquetReader implements HoodieSparkFileReader {

public static final String ENABLE_LOGICAL_TIMESTAMP_REPAIR = "spark.hudi.logicalTimestampField.repair.enable";
private final StoragePath path;
private final HoodieStorage storage;
private final FileFormatUtils parquetUtils;
private final List<ClosableIterator> readerIterators = new ArrayList<>();
private Option<MessageType> fileSchemaOption = Option.empty();
private Option<StructType> structTypeOption = Option.empty();
private Option<Schema> schemaOption = Option.empty();

Expand Down Expand Up @@ -116,19 +121,20 @@ public ClosableIterator<String> getRecordKeyIterator() throws IOException {
}

public ClosableIterator<UnsafeRow> getUnsafeRowIterator(Schema requestedSchema) throws IOException {
return getUnsafeRowIterator(HoodieInternalRowUtils.getCachedSchema(requestedSchema));
}

public ClosableIterator<UnsafeRow> getUnsafeRowIterator(StructType requestedSchema) throws IOException {
SparkBasicSchemaEvolution evolution = new SparkBasicSchemaEvolution(getStructSchema(), requestedSchema, SQLConf.get().sessionLocalTimeZone());
Schema nonNullSchema = AvroSchemaUtils.getNonNullTypeFromUnion(requestedSchema);
StructType structSchema = HoodieInternalRowUtils.getCachedSchema(nonNullSchema);
Option<MessageType> messageSchema = Option.of(getAvroSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(nonNullSchema));
boolean enableTimestampFieldRepair = storage.getConf().getBoolean(ENABLE_LOGICAL_TIMESTAMP_REPAIR, true);
StructType dataStructType = convertToStruct(enableTimestampFieldRepair ? SchemaRepair.repairLogicalTypes(getFileSchema(), messageSchema) : getFileSchema());
SparkBasicSchemaEvolution evolution = new SparkBasicSchemaEvolution(dataStructType, structSchema, SQLConf.get().sessionLocalTimeZone());
String readSchemaJson = evolution.getRequestSchema().json();
storage.getConf().set(ParquetReadSupport.PARQUET_READ_SCHEMA, readSchemaJson);
storage.getConf().set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA(), readSchemaJson);
storage.getConf().set(SQLConf.PARQUET_BINARY_AS_STRING().key(), SQLConf.get().getConf(SQLConf.PARQUET_BINARY_AS_STRING()).toString());
storage.getConf().set(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), SQLConf.get().getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP()).toString());
ParquetReader<InternalRow> reader = ParquetReader.builder(new HoodieParquetReadSupport(Option$.MODULE$.empty(), true,
ParquetReader<InternalRow> reader = ParquetReader.builder(new HoodieParquetReadSupport(Option$.MODULE$.empty(), true, true,
SparkAdapterSupport$.MODULE$.sparkAdapter().getRebaseSpec("CORRECTED"),
SparkAdapterSupport$.MODULE$.sparkAdapter().getRebaseSpec("LEGACY")),
SparkAdapterSupport$.MODULE$.sparkAdapter().getRebaseSpec("LEGACY"), messageSchema),
new Path(path.toUri()))
.withConf(storage.getConf().unwrapAs(Configuration.class))
.build();
Expand All @@ -139,15 +145,22 @@ public ClosableIterator<UnsafeRow> getUnsafeRowIterator(StructType requestedSche
return projectedIterator;
}

private MessageType getFileSchema() {
if (fileSchemaOption.isEmpty()) {
MessageType messageType = ((ParquetUtils) parquetUtils).readSchema(storage, path);
fileSchemaOption = Option.of(messageType);
}
return fileSchemaOption.get();
}

@Override
public Schema getSchema() {
if (schemaOption.isEmpty()) {
// Some types in avro are not compatible with parquet.
// Avro only supports representing Decimals as fixed byte array
// and therefore if we convert to Avro directly we'll lose logical type-info.
MessageType messageType = ((ParquetUtils) parquetUtils).readSchema(storage, path);
StructType structType = new ParquetToSparkSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(messageType);
structTypeOption = Option.of(structType);
MessageType messageType = getFileSchema();
StructType structType = getStructSchema();
schemaOption = Option.of(SparkAdapterSupport$.MODULE$.sparkAdapter()
.getAvroSchemaConverters()
.toAvroType(structType, true, messageType.getName(), StringUtils.EMPTY_STRING));
Expand All @@ -157,11 +170,16 @@ public Schema getSchema() {

protected StructType getStructSchema() {
if (structTypeOption.isEmpty()) {
getSchema();
MessageType messageType = getFileSchema();
structTypeOption = Option.of(convertToStruct(messageType));
}
return structTypeOption.get();
}

private StructType convertToStruct(MessageType messageType) {
return new ParquetToSparkSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(messageType);
}

@Override
public void close() {
readerIterators.forEach(ClosableIterator::close);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
import scala.Enumeration;
import scala.Function1;

import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
import static org.apache.hudi.avro.AvroSchemaUtils.getNonNullTypeFromUnion;
import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_FIELD_ID_WRITE_ENABLED;
import static org.apache.hudi.config.HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD;
import static org.apache.hudi.config.HoodieWriteConfig.AVRO_SCHEMA_STRING;
Expand Down Expand Up @@ -226,7 +226,7 @@ private void writeFields(InternalRow row, StructType schema, ValueWriter[] field
}

private ValueWriter makeWriter(Schema avroSchema, DataType dataType) {
Schema resolvedSchema = avroSchema == null ? null : resolveNullableSchema(avroSchema);
Schema resolvedSchema = avroSchema == null ? null : getNonNullTypeFromUnion(avroSchema);
LogicalType logicalType = resolvedSchema != null ? resolvedSchema.getLogicalType() : null;

if (dataType == DataTypes.BooleanType) {
Expand Down Expand Up @@ -429,7 +429,7 @@ private Type convertField(Schema avroFieldSchema, StructField structField) {
}

private Type convertField(Schema avroFieldSchema, StructField structField, Type.Repetition repetition) {
Schema resolvedSchema = avroFieldSchema == null ? null : resolveNullableSchema(avroFieldSchema);
Schema resolvedSchema = avroFieldSchema == null ? null : getNonNullTypeFromUnion(avroFieldSchema);
LogicalType logicalType = resolvedSchema != null ? resolvedSchema.getLogicalType() : null;

DataType dataType = structField.dataType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ object AvroConversionUtils {
recordNamespace: String): Row => GenericRecord = {
val serde = getCatalystRowSerDe(sourceSqlType)
val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(sourceSqlType, structName, recordNamespace)
val nullable = AvroSchemaUtils.resolveNullableSchema(avroSchema) != avroSchema
val nullable = AvroSchemaUtils.getNonNullTypeFromUnion(avroSchema) != avroSchema

val converter = AvroConversionUtils.createInternalRowToAvroConverter(sourceSqlType, avroSchema, nullable)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport wi
// making Spark deserialize its internal representation [[InternalRow]] into [[Row]] for subsequent conversion
// (and back)
val sameSchema = writerAvroSchema.equals(readerAvroSchema)
val nullable = AvroSchemaUtils.resolveNullableSchema(writerAvroSchema) != writerAvroSchema
val nullable = AvroSchemaUtils.getNonNullTypeFromUnion(writerAvroSchema) != writerAvroSchema

// NOTE: We have to serialize Avro schema, and then subsequently parse it on the executor node, since Spark
// serializer is not able to digest it
Expand Down Expand Up @@ -160,7 +160,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport wi
// making Spark deserialize its internal representation [[InternalRow]] into [[Row]] for subsequent conversion
// (and back)
val sameSchema = writerAvroSchema.equals(readerAvroSchema)
val nullable = AvroSchemaUtils.resolveNullableSchema(writerAvroSchema) != writerAvroSchema
val nullable = AvroSchemaUtils.getNonNullTypeFromUnion(writerAvroSchema) != writerAvroSchema

// NOTE: We have to serialize Avro schema, and then subsequently parse it on the executor node, since Spark
// serializer is not able to digest it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ import org.apache.hudi.common.util.collection.{CachingIterator, ClosableIterator
import org.apache.hudi.io.storage.{HoodieSparkFileReaderFactory, HoodieSparkParquetReader}
import org.apache.hudi.storage.{HoodieStorage, StorageConfiguration, StoragePath}
import org.apache.hudi.util.CloseableInternalRowIterator

import org.apache.parquet.avro.AvroSchemaConverter
import org.apache.parquet.avro.HoodieAvroParquetSchemaConverter.getAvroSchemaConverter
import org.apache.spark.sql.HoodieInternalRowUtils
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.JoinedRow
Expand Down Expand Up @@ -70,27 +73,36 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
override def getFileRecordIterator(filePath: StoragePath,
start: Long,
length: Long,
dataSchema: Schema,
dataSchema: Schema, // dataSchema refers to table schema in most cases(non log file reads).
requiredSchema: Schema,
storage: HoodieStorage): ClosableIterator[InternalRow] = {
val hasRowIndexField = AvroSchemaUtils.containsFieldInSchema(requiredSchema, ROW_INDEX_TEMPORARY_COLUMN_NAME)
if (hasRowIndexField) {
assert(getRecordContext.supportsParquetRowIndex())
}
val structType = HoodieInternalRowUtils.getCachedSchema(requiredSchema)
if (FSUtils.isLogFile(filePath)) {
// TODO: introduce pk filter in log file reader
new HoodieSparkFileReaderFactory(storage).newParquetFileReader(filePath)
.asInstanceOf[HoodieSparkParquetReader].getUnsafeRowIterator(structType).asInstanceOf[ClosableIterator[InternalRow]]
.asInstanceOf[HoodieSparkParquetReader].getUnsafeRowIterator(requiredSchema).asInstanceOf[ClosableIterator[InternalRow]]
} else {
val structType = HoodieInternalRowUtils.getCachedSchema(requiredSchema)
// partition value is empty because the spark parquet reader will append the partition columns to
// each row if they are given. That is the only usage of the partition values in the reader.
val fileInfo = sparkAdapter.getSparkPartitionedFileUtils
.createPartitionedFile(InternalRow.empty, filePath, start, length)
val (readSchema, readFilters) = getSchemaAndFiltersForRead(structType, hasRowIndexField)

// Convert Avro dataSchema to Parquet MessageType for timestamp precision conversion
val tableSchemaOpt = if (dataSchema != null) {
val hadoopConf = storage.getConf.unwrapAs(classOf[Configuration])
val parquetSchema = getAvroSchemaConverter(hadoopConf).convert(dataSchema)
org.apache.hudi.common.util.Option.of(parquetSchema)
} else {
org.apache.hudi.common.util.Option.empty[org.apache.parquet.schema.MessageType]()
}
new CloseableInternalRowIterator(baseFileReader.read(fileInfo,
readSchema, StructType(Seq.empty), getSchemaHandler.getInternalSchemaOpt,
readFilters, storage.getConf.asInstanceOf[StorageConfiguration[Configuration]]))
readFilters, storage.getConf.asInstanceOf[StorageConfiguration[Configuration]], tableSchemaOpt))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hudi.common.util
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.storage.StorageConfiguration
import org.apache.parquet.schema.MessageType
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
Expand All @@ -37,12 +38,14 @@ trait SparkColumnarFileReader extends Serializable {
* @param internalSchemaOpt option of internal schema for schema.on.read
* @param filters filters for data skipping. Not guaranteed to be used; the spark plan will also apply the filters.
* @param storageConf the hadoop conf
* @param tableSchemaOpt option of table schema for timestamp precision conversion
* @return iterator of rows read from the file output type says [[InternalRow]] but could be [[ColumnarBatch]]
*/
def read(file: PartitionedFile,
requiredSchema: StructType,
partitionSchema: StructType,
internalSchemaOpt: util.Option[InternalSchema],
filters: Seq[Filter],
storageConf: StorageConfiguration[Configuration]): Iterator[InternalRow]
storageConf: StorageConfiguration[Configuration],
tableSchemaOpt: util.Option[MessageType] = util.Option.empty()): Iterator[InternalRow]
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.parquet.hadoop.metadata.FileMetaData
import org.apache.spark.sql.HoodieSchemaUtils
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.expressions.{ArrayTransform, Attribute, Cast, CreateNamedStruct, CreateStruct, Expression, GetStructField, LambdaFunction, Literal, MapEntries, MapFromEntries, NamedLambdaVariable, UnsafeProjection}
import org.apache.spark.sql.types.{ArrayType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, StringType, StructField, StructType}
import org.apache.spark.sql.types.{ArrayType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, StringType, StructField, StructType, TimestampNTZType}

object HoodieParquetFileFormatHelper {

Expand Down Expand Up @@ -58,6 +58,9 @@ object HoodieParquetFileFormatHelper {
def isDataTypeEqual(requiredType: DataType, fileType: DataType): Boolean = (requiredType, fileType) match {
case (requiredType, fileType) if requiredType == fileType => true

// prevent illegal cast
case (TimestampNTZType, LongType) => true

case (ArrayType(rt, _), ArrayType(ft, _)) =>
// Do not care about nullability as schema evolution require fields to be nullable
isDataTypeEqual(rt, ft)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.hudi.common.util.ValidationUtils

import org.apache.parquet.hadoop.api.InitContext
import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
import org.apache.parquet.schema.{GroupType, MessageType, Type, Types}
import org.apache.parquet.schema.{GroupType, MessageType, SchemaRepair, Type, Types}
import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec

import java.time.ZoneId
Expand All @@ -34,17 +34,24 @@ import scala.collection.JavaConverters._
class HoodieParquetReadSupport(
convertTz: Option[ZoneId],
enableVectorizedReader: Boolean,
val enableTimestampFieldRepair: Boolean,
datetimeRebaseSpec: RebaseSpec,
int96RebaseSpec: RebaseSpec)
int96RebaseSpec: RebaseSpec,
tableSchemaOpt: org.apache.hudi.common.util.Option[org.apache.parquet.schema.MessageType] = org.apache.hudi.common.util.Option.empty())
extends ParquetReadSupport(convertTz, enableVectorizedReader, datetimeRebaseSpec, int96RebaseSpec) with SparkAdapterSupport {

override def init(context: InitContext): ReadContext = {
val readContext = super.init(context)
val requestedParquetSchema = readContext.getRequestedSchema
// repair is needed here because this is the schema that is used by the reader to decide what
// conversions are necessary
val requestedParquetSchema = if (enableTimestampFieldRepair) {
SchemaRepair.repairLogicalTypes(readContext.getRequestedSchema, tableSchemaOpt)
} else {
readContext.getRequestedSchema
}
val trimmedParquetSchema = HoodieParquetReadSupport.trimParquetSchema(requestedParquetSchema, context.getFileSchema)
new ReadContext(trimmedParquetSchema, readContext.getReadSupportMetadata)
}

}

object HoodieParquetReadSupport {
Expand Down
Loading
Loading