-
Notifications
You must be signed in to change notification settings - Fork 0
2680: chore: various refactoring changes for iceberg [iceberg] #14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughAdded parquet-format-structures Maven dependency across pom.xml files. Introduced IcebergCometNativeBatchReader for Iceberg integration and ParquetMetadataSerializer for metadata serialization. Enhanced NativeBatchReader with FileInfo abstraction and preinitialized readers support. Added helper method in AbstractColumnReader. Updated uuid dependency version in Cargo.toml. Changes
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
| try (FileReader fileReader = | ||
| new FileReader( | ||
| CometInputFile.fromPath(path, conf), footer, readOptions, cometReadOptions, metrics)) { | ||
| CometInputFile.fromPath(path, conf), readOptions, cometReadOptions, metrics)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Optimize FileReader Footer Handling
The FileReader constructor call omits the footer parameter, causing the footer to be re-read from the file even though this.footer is already available. This wastes the pre-loaded footer metadata and defeats optimizations where the footer is passed in separately (like from Iceberg). The constructor should pass footer as the second parameter to use the cached metadata instead of re-reading from disk.
Code Review: Iceberg Native Batch Reader RefactoringSummaryThis PR introduces Iceberg-specific Parquet reading capabilities with serialized metadata support, refactors the core Code Quality & Best PracticesStrengths✅ Clean abstraction: The Issues & Suggestions1. URI Construction in FileInfo Constructor (common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:120-122)URI uri = new Path(filePath).toUri();
if (uri.getScheme() == null) {
uri = new Path("file://" + filePath).toUri();
}Issue: This logic assumes local files when scheme is missing, but the original path might be relative or have other intentions. 2. Static Factory Method Naming (common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:128)public static FileInfo fromPartitionedFile(PartitionedFile file)Suggestion: This is a good pattern. Consider adding a corresponding 3. Timezone Hardcoded to UTC (common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:542-543)// String timeZoneId = conf.get("spark.sql.session.timeZone");
String timeZoneId = "UTC";Issue: The original timezone configuration is commented out and hardcoded to UTC. While the comment at line 543 says "Native code uses UTC always", this removes user flexibility. 4. Exception Handling in IcebergCometNativeBatchReader.init() (common/src/main/java/org/apache/comet/parquet/IcebergCometNativeBatchReader.java:56)throws ThrowableIssue: Using 5. Metrics Clearing (common/src/main/java/org/apache/comet/parquet/IcebergCometNativeBatchReader.java:73)this.metrics.clear();
if (metrics != null) {
this.metrics.putAll(metrics);
}Issue: Clearing metrics before adding new ones could lose existing metrics if the init method is called multiple times. Potential Bugs & IssuesCritical Issues1. Missing Null Check for dataType (common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:999-1011)DataType dataType = null;
// ... loop to find matching field ...
if (dataType == null) {
throw new IOException(
"Could not find matching Spark field for Parquet field: " + field.getName());
}Issue: Good null check, but this could fail if 2. Array Index Bounds (common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:456)if (preInitializedReaders != null && preInitializedReaders[i] != null) {Issue: No bounds checking before accessing 3. Potential IndexOutOfBounds (common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:775-777)int columnIndex = getColumnIndexFromParquetColumn(column);
if (columnIndex == -1
|| preInitializedReaders == null
|| preInitializedReaders[columnIndex] == null) {Issue: If Medium Issues4. Schema Filtering Side Effect (common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:523-541)if (preInitializedReaders != null) {
StructType filteredSchema = new StructType();
// ... filtering logic ...
sparkSchema = filteredSchema;
}Issue: Mutating 5. Resource Cleanup in FileInfo.pathUri() (common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:147-150)public URI pathUri() throws Exception {
URI uri = new URI(filePath);
return uri;
}Issue: The method signature Performance ConsiderationsPositive✅ Metadata serialization: Reusing serialized Parquet metadata avoids redundant I/O operations Concerns1. Repeated String Operations (common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java:92-94)String getPath() {
return String.join(".", this.descriptor.getPath());
}Issue: This creates a new string every time it's called. If called frequently, consider caching. 2. Array Conversion in Hot Path (common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:794)String[] targetPath = asJava(column.path()).toArray(new String[0]);Issue: Converting Scala collections to Java arrays on every call could be expensive. 3. Nested Loop for Field Matching (common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:997-1005)for (int j = 0; j < sparkFields.length; j++) {
if (sparkFields[j].name().equals(field.getName())) {
// ...
}
}Issue: O(n²) complexity when matching multiple fields. Security ConcernsMedium Priority1. Path Injection Risk (common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:120-124)URI uri = new Path(filePath).toUri();
if (uri.getScheme() == null) {
uri = new Path("file://" + filePath).toUri();
}
this.filePath = uri.toString();Issue: No validation of 2. Serialized Data Deserialization (common/src/main/java/org/apache/comet/parquet/ParquetMetadataSerializer.java:69-72)public ParquetMetadata deserialize(byte[] bytes) throws IOException {
ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
FileMetaData fileMetaData = Util.readFileMetaData(inputStream);
return converter.fromParquetMetadata(fileMetaData);
}Issue: Deserializing untrusted byte arrays could be a security risk if the bytes come from untrusted sources.
3. Exception Information Disclosure (multiple locations)throw new IOException("Required column is missing in data file. Col: " + Arrays.toString(path));Issue: Stack traces and error messages might expose internal file system structure. Test CoverageCritical Gap❌ No test files found for new classes
Recommendations1. Add Unit Tests for ParquetMetadataSerializer// Suggested test cases:
- testSerializeDeserializeRoundTrip()
- testDeserializeEmptyBytes()
- testDeserializeCorruptedBytes()
- testSerializeLargeMetadata()2. Add Tests for IcebergCometNativeBatchReader// Suggested test cases:
- testInitWithValidMetadata()
- testInitWithInvalidMetadata()
- testPreInitializedReaders()
- testMissingRequiredColumnsWithPreInitReaders()3. Add Tests for FileInfo// Suggested test cases:
- testFileInfoFromPartitionedFile()
- testFileInfoWithMissingScheme()
- testFileInfoWithRelativePath()
- testPathUriGeneration()4. Integration Tests
Dependency ChangesPositive✅ Added Verification Needed
Summary of RecommendationsMust Fix (Before Merge)
Should Fix
Nice to Have
Overall, this is a solid refactoring that enables important Iceberg functionality. The main concerns are around test coverage, array bounds checking, and input validation. With these addressed, this will be a strong addition to the codebase. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review completed. No suggestions at this time.
Comment augment review to trigger a new review at any time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (1)
common/src/main/java/org/apache/comet/parquet/ParquetMetadataSerializer.java (1)
55-60: Replace magic number with ParquetFileWriter constant.Using a hard-coded
1risks drifting if Parquet bumps its writer version. Please delegate to the official constant to stay aligned automatically.Apply this diff:
@@ -import org.apache.parquet.format.FileMetaData; -import org.apache.parquet.format.Util; -import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.format.FileMetaData; +import org.apache.parquet.format.Util; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ParquetFileWriter; @@ - FileMetaData fileMetaData = converter.toParquetMetadata(1, metadata); + FileMetaData fileMetaData = + converter.toParquetMetadata(ParquetFileWriter.CURRENT_VERSION, metadata);
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
common/pom.xml(1 hunks)common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java(1 hunks)common/src/main/java/org/apache/comet/parquet/IcebergCometNativeBatchReader.java(1 hunks)common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java(11 hunks)common/src/main/java/org/apache/comet/parquet/ParquetMetadataSerializer.java(1 hunks)native/core/Cargo.toml(1 hunks)pom.xml(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java (3)
common/src/main/scala/org/apache/comet/objectstore/NativeConfig.scala (2)
NativeConfig(30-88)extractObjectStoreOptions(58-87)common/src/main/java/org/apache/comet/parquet/ReadOptions.java (1)
ReadOptions(36-210)common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala (1)
toArrowSchema(191-195)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
- GitHub Check: Cursor Bugbot
- GitHub Check: codex
- GitHub Check: claude-review
- GitHub Check: spark-sql-catalyst/ubuntu-24.04/spark-4.0.1/java-17
- GitHub Check: spark-sql-sql/hive-1/ubuntu-24.04/spark-4.0.1/java-17
- GitHub Check: spark-sql-sql/core-2/ubuntu-24.04/spark-4.0.1/java-17
- GitHub Check: spark-sql-sql/core-1/ubuntu-24.04/spark-4.0.1/java-17
- GitHub Check: spark-sql-sql/hive-3/ubuntu-24.04/spark-4.0.1/java-17
- GitHub Check: spark-sql-sql/hive-2/ubuntu-24.04/spark-4.0.1/java-17
- GitHub Check: spark-sql-sql/core-3/ubuntu-24.04/spark-4.0.1/java-17
| // Keep the column if: | ||
| // 1. It doesn't have a preinitialized reader, OR | ||
| // 2. It has a preinitialized reader but exists in fileSchema | ||
| boolean hasPreInitializedReader = | ||
| i < preInitializedReaders.length && preInitializedReaders[i] != null; | ||
| int finalI = i; | ||
| boolean existsInFileSchema = | ||
| fileFields.stream().anyMatch(f -> f.getName().equals(sparkFields[finalI].name())); | ||
|
|
||
| if (!hasPreInitializedReader || existsInFileSchema) { | ||
| filteredSchema = filteredSchema.add(sparkFields[i]); | ||
| } | ||
| } | ||
| sparkSchema = filteredSchema; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Respect case-insensitive schema resolution when filtering Spark fields
Line 535 currently does a hard equals between Spark and Parquet column names. In sessions where isCaseSensitive is false (Spark’s default), legitimate columns whose casing differs (common with Hive/Iceberg metadata) are treated as “missing”, so we drop them from sparkSchema and later fail to produce data for them. Please compare using the same resolver as the rest of the reader—e.g., branch on isCaseSensitive and fall back to equalsIgnoreCase—so case-insensitive schemas keep working.
🤖 Prompt for AI Agents
In common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java around
lines 528 to 541, the name comparison when deciding whether a Spark field exists
in the Parquet file uses a case-sensitive equals which breaks behaviour when
Spark is configured case-insensitive; change the existence check to respect the
reader’s case-sensitivity setting (i.e., if isCaseSensitive use equals,
otherwise use a case-insensitive comparison like equalsIgnoreCase or the same
resolver used elsewhere in this class) so fields that differ only by case are
not dropped from filteredSchema; update the anyMatch predicate accordingly to
use that conditional comparison.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value:useful; category:bug; feedback: The CodeRabbit AI reviewer is correct that Apache Spark has a special configuration setting to decide whether to compare the field names case sensitive or not. The finding prevents issues in Parquet <-> Spark field mappings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@martin-augment, thank you for confirming the finding! I'm glad the case-sensitivity analysis was helpful in identifying a potential issue with Parquet ↔ Spark field mappings. Let me know if you need any assistance implementing the fix or have questions about the suggested approach.
| // Find the corresponding spark field by matching field names | ||
| DataType dataType = null; | ||
| int sparkSchemaIndex = -1; | ||
| for (int j = 0; j < sparkFields.length; j++) { | ||
| if (sparkFields[j].name().equals(field.getName())) { | ||
| dataType = sparkFields[j].dataType(); | ||
| sparkSchemaIndex = j; | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| if (dataType == null) { | ||
| throw new IOException( | ||
| "Could not find matching Spark field for Parquet field: " + field.getName()); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Column lookup must honor case-insensitive mode
When we locate the Spark field for a Parquet column, we again rely on a case-sensitive equals. With isCaseSensitive == false, the lookup falls through and throws IOException: Could not find matching Spark field... even though the column exists (only case differs). Please resolve using the configured resolver (e.g., isCaseSensitive ? equals : equalsIgnoreCase) before throwing, otherwise Iceberg/Hive workloads in the default case-insensitive mode break.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value:useful; category:bug; feedback: The CodeRabbit AI reviewer is correct that Apache Spark has a special configuration setting to decide whether to compare the field names case sensitive or not. The finding prevents issues in Parquet <-> Spark field mappings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@martin-augment, thank you for confirming this finding! I'm glad the review helped identify this issue before it could cause problems with case-insensitive Parquet field mappings in production workloads. Your feedback helps me improve future reviews. 🙏
(\___/)
(='.'=) Happy to help catch that bug!
(")_(")
✏️ Learnings added
Learnt from: martin-augment
Repo: martin-augment/datafusion-comet PR: 14
File: common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:998-1012
Timestamp: 2025-11-10T07:10:16.234Z
Learning: In NativeBatchReader.java (Java, Apache Comet Parquet reader), when matching Spark field names to Parquet field names, always respect the `isCaseSensitive` configuration flag. Use case-sensitive comparison (`equals`) when `isCaseSensitive == true`, and case-insensitive comparison (`equalsIgnoreCase`) when `isCaseSensitive == false`. This prevents field lookup failures in Iceberg/Hive workloads that run in the default case-insensitive mode.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
|
Findings
|
value:useful; category:bug; feedback:The Codex AI reviewer is correct that the PR changes the timezone from the configured one ( |
value:incorrect-but-reasonable; category:bug; feedback: The Claude AI reviewer is not correct. Spark's StructType#fields() returns an empty array, so there is no chance for NullPointerException (https://github.com/apache/spark/blob/79d0c427878f3fc9a95375679934ce6d59595014/sql/api/src/main/scala/org/apache/spark/sql/types/StructType.scala#L108) |
value:useful; category:bug; feedback:The Claude AI reviewer is correct that there is no check whether preInitializedReaders has an item for |
value:useful; category:bug; feedback:The Claude AI reviewer is correct that there is no check whether preInitializedReaders has an item for |
value:good-to-have; category:bug; feedback:The Claude AI reviewer is correct that the method's throws clause is broader than it need to be. Since the method could throw only one type of exception (URISyntaxException) it is better to be specific here instead of using the more generic java.lang.Exception. This way the callers of this method could handle it in a more specific way. |
value:good-but-wont-fix; category:bug; feedback:The Claude AI reviewer is correct that these lines in the code might be expensive if used in hot path but usually there is no need to optimize something until it shows in the CPU profiler as slow. |
value:good-to-have; category:bug; feedback:The Claude AI reviewer is correct that these loops could be optimized to execute faster. But this depends on whether the fields should be compared always in a case sensitive way or not. The finding prevents slower execution when comparing the Parquet fields with the Spark ones. |
2680: To review by AI
Note
Introduces an Iceberg-specific native batch reader using serialized Parquet metadata, refactors the core reader to support FileInfo and pre-initialized readers, and updates Parquet/deps (incl. parquet-format-structures) plus bumps Rust uuid.
IcebergCometNativeBatchReaderaccepting Thrift-encodedParquetMetadatabytes; two-step init viaFileInfo; supports passingpreInitializedReaders.NativeBatchReaderFileInfoabstraction (URI normalization) and alternate constructor; fields widened toprotected.preInitializedReaders(schema filtering, required-column checks, and reader reuse).UTC; Spark field matching by name; usesCometInputFile.fromPath(...)without footer; object store options viapathUri; key retriever keyed byfilePath.getColumnIndexFromParquetColumnand updatedcheckParquetTypeto allow preinitialized required columns.AbstractColumnReaderaddsgetPath().ParquetMetadataSerializerfor serialize/deserialize Parquet metadata bytes.org.apache.parquet:parquet-format-structures(and exclude from Spark deps); wire intocommon/pom.xmland rootpom.xmldependency management.uuidto1.18.1innative/core/Cargo.toml.Written by Cursor Bugbot for commit d8cd7b7. This will update automatically on new commits. Configure here.