Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ under the License.
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-format-structures</artifactId>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ ColumnDescriptor getDescriptor() {
return descriptor;
}

String getPath() {
return String.join(".", this.descriptor.getPath());
}

/**
* Set the batch size of this reader to be 'batchSize'. Also initializes the native column reader.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.parquet;

import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.metric.SQLMetric;
import org.apache.spark.sql.types.StructType;

/**
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class Javadoc says ParquetMetadata is passed as a JSON string, but this API takes raw bytes and uses Parquet’s serializer; consider updating the comment to say “serialized bytes” instead of “JSON string.”

🤖 React with 👍 or 👎 to let us know if the comment was useful.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:documentation; feedback:The AI reviewer is correct that the Javadoc documentation is not correct. https://github.com/apache/datafusion-comet/pull/2680/files#diff-e57878f6cd8036999500de5719f8f4bbe28e1ed5dcb79a02ad7d7eb206f37473R44 shows that the input is provided as a byte array, not a String. The would prevent confusion in the developer how to use this new API.

* A specialized NativeBatchReader for Iceberg that accepts ParquetMetadata as a JSON string. This
* allows Iceberg to pass metadata in serialized form with a two-step initialization pattern.
*/
public class IcebergCometNativeBatchReader extends NativeBatchReader {

public IcebergCometNativeBatchReader(StructType requiredSchema) {
super();
this.sparkSchema = requiredSchema;
}

/** Initialize the reader using FileInfo instead of PartitionedFile. */
public void init(
Configuration conf,
FileInfo fileInfo,
byte[] parquetMetadataBytes,
byte[] nativeFilter,
int capacity,
StructType dataSchema,
boolean isCaseSensitive,
boolean useFieldId,
boolean ignoreMissingIds,
boolean useLegacyDateTimestamp,
StructType partitionSchema,
InternalRow partitionValues,
AbstractColumnReader[] preInitializedReaders,
Map<String, SQLMetric> metrics)
throws Throwable {

// Set parent fields
this.conf = conf;
this.fileInfo = fileInfo;
this.footer = new ParquetMetadataSerializer().deserialize(parquetMetadataBytes);
this.nativeFilter = nativeFilter;
this.capacity = capacity;
this.dataSchema = dataSchema;
this.isCaseSensitive = isCaseSensitive;
this.useFieldId = useFieldId;
this.ignoreMissingIds = ignoreMissingIds;
this.partitionSchema = partitionSchema;
this.partitionValues = partitionValues;
this.preInitializedReaders = preInitializedReaders;

// Call parent init method
super.init();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Legacy Date Timestamp Not Honored in Init

The IcebergCometNativeBatchReader.init() method receives the useLegacyDateTimestamp parameter but doesn't assign it to the inherited this.useLegacyDateTimestamp field. This causes super.init() to use the field's default value (false) instead of the provided parameter, potentially leading to incorrect date/timestamp handling.

Fix in Cursor Fix in Web

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback:The AI reviewer is correct that the passed function argument was not stored and this way IcebergCometNativeBatchReader will always use the default value of false(https://github.com/parthchandra/datafusion-comet/blob/d73bcbab9f80836d7229207f309283942501e9ab/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java) and this may lead to wrongly parsed dates and timestamps.

}
Comment on lines +58 to +74
Copy link

@coderabbitai coderabbitai bot Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Propagate legacy timestamp handling (and metrics) into the parent state

useLegacyDateTimestamp gets passed into this init method but never stored on the parent, so the downstream NativeColumnReader creation always sees the default false. In legacy-read scenarios this flips the behavior (either rebasing unexpectedly or throwing) and gives wrong results. While you're here, the incoming metrics map is ignored as well, so all native metrics remain unset on the Iceberg path.

Please wire both through before calling super.init():

     this.partitionSchema = partitionSchema;
     this.partitionValues = partitionValues;
-    this.preInitializedReaders = preInitializedReaders;
+    this.useLegacyDateTimestamp = useLegacyDateTimestamp;
+    this.preInitializedReaders = preInitializedReaders;
+    this.metrics.clear();
+    if (metrics != null) {
+      this.metrics.putAll(metrics);
+    }
🤖 Prompt for AI Agents
In
common/src/main/java/org/apache/comet/parquet/IcebergCometNativeBatchReader.java
around lines 58 to 74, the init method assigns many fields but does not
propagate the incoming useLegacyDateTimestamp flag nor the metrics map into the
parent state before calling super.init(), causing NativeColumnReader creation to
see defaults and metrics to be lost; fix by assigning the parent's corresponding
fields (store useLegacyDateTimestamp and the incoming metrics map into the
parent/member variables used by the superclass) prior to invoking super.init(),
then call super.init() so legacy timestamp handling and native metrics are
honored.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback:The AI reviewer is correct that the passed function argument was not stored and this way IcebergCometNativeBatchReader will always use the default value of false(https://github.com/parthchandra/datafusion-comet/blob/d73bcbab9f80836d7229207f309283942501e9ab/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java) and this may lead to wrongly parsed dates and timestamps.
The AI reviewer is also correct that the metrics argument is also missed. This will lead to using the inherited metrics from NativeBatchReader which most probably will be incorrect.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martin-augment Thank you for confirming the analysis and providing the detailed feedback! I'm glad the review caught both the useLegacyDateTimestamp and metrics issues before they could cause incorrect date/timestamp parsing and metrics reporting in production.

The suggested fix will ensure that both fields are properly propagated to the parent class before super.init() is called, so the Iceberg reader will honor the legacy timestamp handling flag and correctly track native metrics.


public StructType getSparkSchema() {
return this.sparkSchema;
}
}
Loading