Skip to content

Conversation

@martin-augment
Copy link
Owner

@martin-augment martin-augment commented Nov 4, 2025

2680: To review by AI


Note

Adds an Iceberg-specific batch reader that initializes from serialized Parquet metadata, refactors NativeBatchReader with FileInfo and preinitialized column support, and updates Parquet/Rust dependencies.

  • Parquet Readers:
    • Introduce IcebergCometNativeBatchReader to initialize from FileInfo and serialized ParquetMetadata bytes; exposes getSparkSchema.
    • Refactor NativeBatchReader:
      • Add FileInfo abstraction and alternate constructor; allow init without PartitionedFile.
      • Support preInitializedReaders and filter them from sparkSchema before native call.
      • Update missing/required column checks to allow preinitialized readers; add helper getColumnIndexFromParquetColumn.
      • Use UTC for Arrow schema conversion; derive object store options from pathUri.
      • Adjust encryption key retriever to use filePath; broaden fields/methods to protected; add no-arg protected ctor.
      • Update FileReader construction (remove footer arg) and related range/path handling.
    • AbstractColumnReader: add getPath().
  • Serialization:
    • Add ParquetMetadataSerializer to serialize/deserialize ParquetMetadata via parquet-format-structures.
  • Dependencies:
    • Add org.apache.parquet:parquet-format-structures; update exclusions in parent pom.xml.
    • Rust: change opendal to 0.54.0; bump uuid to 1.18.1.

Written by Cursor Bugbot for commit 6b551d7. This will update automatically on new commits. Configure here.

@coderabbitai
Copy link

coderabbitai bot commented Nov 4, 2025

Walkthrough

This PR introduces parquet-format-structures dependency management, adds FileInfo abstraction to NativeBatchReader for flexible input source handling, creates IcebergCometNativeBatchReader for Iceberg integration with serialized metadata support, and adds ParquetMetadataSerializer utility for metadata serialization.

Changes

Cohort / File(s) Summary
Dependency Management
common/pom.xml, pom.xml, native/core/Cargo.toml
Added parquet-format-structures Maven dependency with version management in root and common pom.xml; updated Rust dependencies uuid from 1.0 to 1.18.1 and opendal from 0.54.1 to 0.54.0
Parquet Metadata Serialization
common/src/main/java/org/apache/comet/parquet/ParquetMetadataSerializer.java
New utility class for serializing/deserializing ParquetMetadata to/from byte arrays using FileMetaData intermediate representation
Reader Infrastructure
common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java, common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
Added getPath() method to AbstractColumnReader; introduced FileInfo abstraction with URI handling, preInitializedReaders field, protected no-arg constructor, and new FileInfo-based constructor to NativeBatchReader
Iceberg Integration
common/src/main/java/org/apache/comet/parquet/IcebergCometNativeBatchReader.java
New IcebergCometNativeBatchReader class extending NativeBatchReader with JSON-like metadata byte deserialization, two-step initialization pattern via dedicated init method, and sparkSchema accessor
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch pr-2680-2025-11-04-13-05-30

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@augmentcode augmentcode bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review completed. 1 suggestions posted.

deduped.orderBy(*deduped.columns).coalesce(1).write.mode("overwrite").parquet(output_path)
print(f"Query {query} results written to {output_path}")
else:
print(f"Skipping write: DataFrame has no schema for {output_path}")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

output_path is referenced here but is only defined inside the preceding if-block, which will raise a NameError when write_path is set and the DataFrame has no schema (empty columns).

🤖 React with 👍 or 👎 to let us know if the comment was useful.

@martin-augment martin-augment force-pushed the pr-2680-2025-11-04-13-05-30 branch from d73bcba to 6b551d7 Compare November 4, 2025 13:09
this.preInitializedReaders = preInitializedReaders;

// Call parent init method
super.init();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Legacy Date Timestamp Not Honored in Init

The IcebergCometNativeBatchReader.init() method receives the useLegacyDateTimestamp parameter but doesn't assign it to the inherited this.useLegacyDateTimestamp field. This causes super.init() to use the field's default value (false) instead of the provided parameter, potentially leading to incorrect date/timestamp handling.

Fix in Cursor Fix in Web

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback:The AI reviewer is correct that the passed function argument was not stored and this way IcebergCometNativeBatchReader will always use the default value of false(https://github.com/parthchandra/datafusion-comet/blob/d73bcbab9f80836d7229207f309283942501e9ab/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java) and this may lead to wrongly parsed dates and timestamps.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4a5b13c and 6b551d7.

📒 Files selected for processing (7)
  • common/pom.xml (1 hunks)
  • common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java (1 hunks)
  • common/src/main/java/org/apache/comet/parquet/IcebergCometNativeBatchReader.java (1 hunks)
  • common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java (10 hunks)
  • common/src/main/java/org/apache/comet/parquet/ParquetMetadataSerializer.java (1 hunks)
  • native/core/Cargo.toml (1 hunks)
  • pom.xml (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java (3)
common/src/main/scala/org/apache/comet/objectstore/NativeConfig.scala (2)
  • NativeConfig (30-88)
  • extractObjectStoreOptions (58-87)
common/src/main/java/org/apache/comet/parquet/ReadOptions.java (1)
  • ReadOptions (36-210)
common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala (1)
  • toArrowSchema (191-195)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Cursor Bugbot

Comment on lines +58 to +74
// Set parent fields
this.conf = conf;
this.fileInfo = fileInfo;
this.footer = new ParquetMetadataSerializer().deserialize(parquetMetadataBytes);
this.nativeFilter = nativeFilter;
this.capacity = capacity;
this.dataSchema = dataSchema;
this.isCaseSensitive = isCaseSensitive;
this.useFieldId = useFieldId;
this.ignoreMissingIds = ignoreMissingIds;
this.partitionSchema = partitionSchema;
this.partitionValues = partitionValues;
this.preInitializedReaders = preInitializedReaders;

// Call parent init method
super.init();
}
Copy link

@coderabbitai coderabbitai bot Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Propagate legacy timestamp handling (and metrics) into the parent state

useLegacyDateTimestamp gets passed into this init method but never stored on the parent, so the downstream NativeColumnReader creation always sees the default false. In legacy-read scenarios this flips the behavior (either rebasing unexpectedly or throwing) and gives wrong results. While you're here, the incoming metrics map is ignored as well, so all native metrics remain unset on the Iceberg path.

Please wire both through before calling super.init():

     this.partitionSchema = partitionSchema;
     this.partitionValues = partitionValues;
-    this.preInitializedReaders = preInitializedReaders;
+    this.useLegacyDateTimestamp = useLegacyDateTimestamp;
+    this.preInitializedReaders = preInitializedReaders;
+    this.metrics.clear();
+    if (metrics != null) {
+      this.metrics.putAll(metrics);
+    }
🤖 Prompt for AI Agents
In
common/src/main/java/org/apache/comet/parquet/IcebergCometNativeBatchReader.java
around lines 58 to 74, the init method assigns many fields but does not
propagate the incoming useLegacyDateTimestamp flag nor the metrics map into the
parent state before calling super.init(), causing NativeColumnReader creation to
see defaults and metrics to be lost; fix by assigning the parent's corresponding
fields (store useLegacyDateTimestamp and the incoming metrics map into the
parent/member variables used by the superclass) prior to invoking super.init(),
then call super.init() so legacy timestamp handling and native metrics are
honored.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback:The AI reviewer is correct that the passed function argument was not stored and this way IcebergCometNativeBatchReader will always use the default value of false(https://github.com/parthchandra/datafusion-comet/blob/d73bcbab9f80836d7229207f309283942501e9ab/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java) and this may lead to wrongly parsed dates and timestamps.
The AI reviewer is also correct that the metrics argument is also missed. This will lead to using the inherited metrics from NativeBatchReader which most probably will be incorrect.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martin-augment Thank you for confirming the analysis and providing the detailed feedback! I'm glad the review caught both the useLegacyDateTimestamp and metrics issues before they could cause incorrect date/timestamp parsing and metrics reporting in production.

The suggested fix will ensure that both fields are properly propagated to the parent class before super.init() is called, so the Iceberg reader will honor the legacy timestamp handling flag and correctly track native metrics.

Comment on lines +522 to +533
// Filter out columns with preinitialized readers from sparkSchema before making the
// call to native
if (preInitializedReaders != null) {
StructType filteredSchema = new StructType();
StructField[] sparkFields = sparkSchema.fields();
for (int i = 0; i < sparkFields.length; i++) {
if (i >= preInitializedReaders.length || preInitializedReaders[i] == null) {
filteredSchema = filteredSchema.add(sparkFields[i]);
}
}
sparkSchema = filteredSchema;
}
Copy link

@coderabbitai coderabbitai bot Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Filtering sparkSchema here breaks column index alignment

Dropping pre-initialized columns by overwriting sparkSchema causes two failures:

  1. Later in loadNextBatch we still iterate i over the original Parquet field count. As soon as a pre-initialized column appears before another projected column, sparkSchema.fields()[i] trips an ArrayIndexOutOfBoundsException because the filtered schema is now shorter.
  2. We pass i straight into NativeColumnReader(handle, i, …). After removing entries from the Arrow schema, native column i no longer exists, so we either read the wrong column or crash.

Net effect: any file that relies on preInitializedReaders and projects multiple columns will blow up at runtime.

We need to keep the stored sparkSchema (and column indexes) intact and introduce a separate mapping for the subset we expose to native—e.g., keep the original schema, build a filtered schema just for Arrow/native, and track the mapping from requested column index → native column index when instantiating NativeColumnReader. Until that mapping is in place, we cannot mutate sparkSchema here.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback:The AI reviewer is correct that the number of fields in the sparkSchema may be reduced if there are preInitializedReaders and this may lead to ArrayIndexOutOfBoundsException when trying to read a field by index that is beyond the list size. This finding would prevent runtime errors!

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martin-augment Thank you for confirming! I'm glad the analysis helped catch this issue before it caused runtime problems. The original comment outlines a path forward—keeping the full sparkSchema intact for indexing while creating a separate filtered schema for the native layer, along with an index mapping for NativeColumnReader. Let me know if you'd like me to help explore the fix further or if you have any questions!

@martin-augment
Copy link
Owner Author

augment review

Copy link

@augmentcode augmentcode bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review completed. 1 suggestion posted.

import org.apache.spark.sql.execution.metric.SQLMetric;
import org.apache.spark.sql.types.StructType;

/**
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class Javadoc says ParquetMetadata is passed as a JSON string, but this API takes raw bytes and uses Parquet’s serializer; consider updating the comment to say “serialized bytes” instead of “JSON string.”

🤖 React with 👍 or 👎 to let us know if the comment was useful.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:documentation; feedback:The AI reviewer is correct that the Javadoc documentation is not correct. https://github.com/apache/datafusion-comet/pull/2680/files#diff-e57878f6cd8036999500de5719f8f4bbe28e1ed5dcb79a02ad7d7eb206f37473R44 shows that the input is provided as a byte array, not a String. The would prevent confusion in the developer how to use this new API.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants