-
Notifications
You must be signed in to change notification settings - Fork 0
2680: chore: various refactoring changes for iceberg #5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughThis PR introduces parquet-format-structures dependency management, adds FileInfo abstraction to NativeBatchReader for flexible input source handling, creates IcebergCometNativeBatchReader for Iceberg integration with serialized metadata support, and adds ParquetMetadataSerializer utility for metadata serialization. Changes
✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review completed. 1 suggestions posted.
| deduped.orderBy(*deduped.columns).coalesce(1).write.mode("overwrite").parquet(output_path) | ||
| print(f"Query {query} results written to {output_path}") | ||
| else: | ||
| print(f"Skipping write: DataFrame has no schema for {output_path}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
output_path is referenced here but is only defined inside the preceding if-block, which will raise a NameError when write_path is set and the DataFrame has no schema (empty columns).
🤖 React with 👍 or 👎 to let us know if the comment was useful.
d73bcba to
6b551d7
Compare
| this.preInitializedReaders = preInitializedReaders; | ||
|
|
||
| // Call parent init method | ||
| super.init(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Legacy Date Timestamp Not Honored in Init
The IcebergCometNativeBatchReader.init() method receives the useLegacyDateTimestamp parameter but doesn't assign it to the inherited this.useLegacyDateTimestamp field. This causes super.init() to use the field's default value (false) instead of the provided parameter, potentially leading to incorrect date/timestamp handling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value:useful; category:bug; feedback:The AI reviewer is correct that the passed function argument was not stored and this way IcebergCometNativeBatchReader will always use the default value of false(https://github.com/parthchandra/datafusion-comet/blob/d73bcbab9f80836d7229207f309283942501e9ab/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java) and this may lead to wrongly parsed dates and timestamps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
common/pom.xml(1 hunks)common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java(1 hunks)common/src/main/java/org/apache/comet/parquet/IcebergCometNativeBatchReader.java(1 hunks)common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java(10 hunks)common/src/main/java/org/apache/comet/parquet/ParquetMetadataSerializer.java(1 hunks)native/core/Cargo.toml(1 hunks)pom.xml(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java (3)
common/src/main/scala/org/apache/comet/objectstore/NativeConfig.scala (2)
NativeConfig(30-88)extractObjectStoreOptions(58-87)common/src/main/java/org/apache/comet/parquet/ReadOptions.java (1)
ReadOptions(36-210)common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala (1)
toArrowSchema(191-195)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Cursor Bugbot
| // Set parent fields | ||
| this.conf = conf; | ||
| this.fileInfo = fileInfo; | ||
| this.footer = new ParquetMetadataSerializer().deserialize(parquetMetadataBytes); | ||
| this.nativeFilter = nativeFilter; | ||
| this.capacity = capacity; | ||
| this.dataSchema = dataSchema; | ||
| this.isCaseSensitive = isCaseSensitive; | ||
| this.useFieldId = useFieldId; | ||
| this.ignoreMissingIds = ignoreMissingIds; | ||
| this.partitionSchema = partitionSchema; | ||
| this.partitionValues = partitionValues; | ||
| this.preInitializedReaders = preInitializedReaders; | ||
|
|
||
| // Call parent init method | ||
| super.init(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Propagate legacy timestamp handling (and metrics) into the parent state
useLegacyDateTimestamp gets passed into this init method but never stored on the parent, so the downstream NativeColumnReader creation always sees the default false. In legacy-read scenarios this flips the behavior (either rebasing unexpectedly or throwing) and gives wrong results. While you're here, the incoming metrics map is ignored as well, so all native metrics remain unset on the Iceberg path.
Please wire both through before calling super.init():
this.partitionSchema = partitionSchema;
this.partitionValues = partitionValues;
- this.preInitializedReaders = preInitializedReaders;
+ this.useLegacyDateTimestamp = useLegacyDateTimestamp;
+ this.preInitializedReaders = preInitializedReaders;
+ this.metrics.clear();
+ if (metrics != null) {
+ this.metrics.putAll(metrics);
+ }🤖 Prompt for AI Agents
In
common/src/main/java/org/apache/comet/parquet/IcebergCometNativeBatchReader.java
around lines 58 to 74, the init method assigns many fields but does not
propagate the incoming useLegacyDateTimestamp flag nor the metrics map into the
parent state before calling super.init(), causing NativeColumnReader creation to
see defaults and metrics to be lost; fix by assigning the parent's corresponding
fields (store useLegacyDateTimestamp and the incoming metrics map into the
parent/member variables used by the superclass) prior to invoking super.init(),
then call super.init() so legacy timestamp handling and native metrics are
honored.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value:useful; category:bug; feedback:The AI reviewer is correct that the passed function argument was not stored and this way IcebergCometNativeBatchReader will always use the default value of false(https://github.com/parthchandra/datafusion-comet/blob/d73bcbab9f80836d7229207f309283942501e9ab/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java) and this may lead to wrongly parsed dates and timestamps.
The AI reviewer is also correct that the metrics argument is also missed. This will lead to using the inherited metrics from NativeBatchReader which most probably will be incorrect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@martin-augment Thank you for confirming the analysis and providing the detailed feedback! I'm glad the review caught both the useLegacyDateTimestamp and metrics issues before they could cause incorrect date/timestamp parsing and metrics reporting in production.
The suggested fix will ensure that both fields are properly propagated to the parent class before super.init() is called, so the Iceberg reader will honor the legacy timestamp handling flag and correctly track native metrics.
| // Filter out columns with preinitialized readers from sparkSchema before making the | ||
| // call to native | ||
| if (preInitializedReaders != null) { | ||
| StructType filteredSchema = new StructType(); | ||
| StructField[] sparkFields = sparkSchema.fields(); | ||
| for (int i = 0; i < sparkFields.length; i++) { | ||
| if (i >= preInitializedReaders.length || preInitializedReaders[i] == null) { | ||
| filteredSchema = filteredSchema.add(sparkFields[i]); | ||
| } | ||
| } | ||
| sparkSchema = filteredSchema; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filtering sparkSchema here breaks column index alignment
Dropping pre-initialized columns by overwriting sparkSchema causes two failures:
- Later in
loadNextBatchwe still iterateiover the original Parquet field count. As soon as a pre-initialized column appears before another projected column,sparkSchema.fields()[i]trips anArrayIndexOutOfBoundsExceptionbecause the filtered schema is now shorter. - We pass
istraight intoNativeColumnReader(handle, i, …). After removing entries from the Arrow schema, native columnino longer exists, so we either read the wrong column or crash.
Net effect: any file that relies on preInitializedReaders and projects multiple columns will blow up at runtime.
We need to keep the stored sparkSchema (and column indexes) intact and introduce a separate mapping for the subset we expose to native—e.g., keep the original schema, build a filtered schema just for Arrow/native, and track the mapping from requested column index → native column index when instantiating NativeColumnReader. Until that mapping is in place, we cannot mutate sparkSchema here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value:useful; category:bug; feedback:The AI reviewer is correct that the number of fields in the sparkSchema may be reduced if there are preInitializedReaders and this may lead to ArrayIndexOutOfBoundsException when trying to read a field by index that is beyond the list size. This finding would prevent runtime errors!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@martin-augment Thank you for confirming! I'm glad the analysis helped catch this issue before it caused runtime problems. The original comment outlines a path forward—keeping the full sparkSchema intact for indexing while creating a separate filtered schema for the native layer, along with an index mapping for NativeColumnReader. Let me know if you'd like me to help explore the fix further or if you have any questions!
|
augment review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review completed. 1 suggestion posted.
| import org.apache.spark.sql.execution.metric.SQLMetric; | ||
| import org.apache.spark.sql.types.StructType; | ||
|
|
||
| /** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The class Javadoc says ParquetMetadata is passed as a JSON string, but this API takes raw bytes and uses Parquet’s serializer; consider updating the comment to say “serialized bytes” instead of “JSON string.”
🤖 React with 👍 or 👎 to let us know if the comment was useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value:useful; category:documentation; feedback:The AI reviewer is correct that the Javadoc documentation is not correct. https://github.com/apache/datafusion-comet/pull/2680/files#diff-e57878f6cd8036999500de5719f8f4bbe28e1ed5dcb79a02ad7d7eb206f37473R44 shows that the input is provided as a byte array, not a String. The would prevent confusion in the developer how to use this new API.
2680: To review by AI
Note
Adds an Iceberg-specific batch reader that initializes from serialized Parquet metadata, refactors
NativeBatchReaderwithFileInfoand preinitialized column support, and updates Parquet/Rust dependencies.IcebergCometNativeBatchReaderto initialize fromFileInfoand serializedParquetMetadatabytes; exposesgetSparkSchema.NativeBatchReader:FileInfoabstraction and alternate constructor; allow init withoutPartitionedFile.preInitializedReadersand filter them fromsparkSchemabefore native call.getColumnIndexFromParquetColumn.UTCfor Arrow schema conversion; derive object store options frompathUri.filePath; broaden fields/methods toprotected; add no-arg protected ctor.FileReaderconstruction (removefooterarg) and related range/path handling.AbstractColumnReader: addgetPath().ParquetMetadataSerializerto serialize/deserializeParquetMetadataviaparquet-format-structures.org.apache.parquet:parquet-format-structures; update exclusions in parentpom.xml.opendalto0.54.0; bumpuuidto1.18.1.Written by Cursor Bugbot for commit 6b551d7. This will update automatically on new commits. Configure here.