-
Notifications
You must be signed in to change notification settings - Fork 0
2680: chore: various refactoring changes for iceberg #5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,79 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.comet.parquet; | ||
|
|
||
| import java.util.Map; | ||
|
|
||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.spark.sql.catalyst.InternalRow; | ||
| import org.apache.spark.sql.execution.metric.SQLMetric; | ||
| import org.apache.spark.sql.types.StructType; | ||
|
|
||
| /** | ||
| * A specialized NativeBatchReader for Iceberg that accepts ParquetMetadata as a JSON string. This | ||
| * allows Iceberg to pass metadata in serialized form with a two-step initialization pattern. | ||
| */ | ||
| public class IcebergCometNativeBatchReader extends NativeBatchReader { | ||
|
|
||
| public IcebergCometNativeBatchReader(StructType requiredSchema) { | ||
| super(); | ||
| this.sparkSchema = requiredSchema; | ||
| } | ||
|
|
||
| /** Initialize the reader using FileInfo instead of PartitionedFile. */ | ||
| public void init( | ||
| Configuration conf, | ||
| FileInfo fileInfo, | ||
| byte[] parquetMetadataBytes, | ||
| byte[] nativeFilter, | ||
| int capacity, | ||
| StructType dataSchema, | ||
| boolean isCaseSensitive, | ||
| boolean useFieldId, | ||
| boolean ignoreMissingIds, | ||
| boolean useLegacyDateTimestamp, | ||
| StructType partitionSchema, | ||
| InternalRow partitionValues, | ||
| AbstractColumnReader[] preInitializedReaders, | ||
| Map<String, SQLMetric> metrics) | ||
| throws Throwable { | ||
|
|
||
| // Set parent fields | ||
| this.conf = conf; | ||
| this.fileInfo = fileInfo; | ||
| this.footer = new ParquetMetadataSerializer().deserialize(parquetMetadataBytes); | ||
| this.nativeFilter = nativeFilter; | ||
| this.capacity = capacity; | ||
| this.dataSchema = dataSchema; | ||
| this.isCaseSensitive = isCaseSensitive; | ||
| this.useFieldId = useFieldId; | ||
| this.ignoreMissingIds = ignoreMissingIds; | ||
| this.partitionSchema = partitionSchema; | ||
| this.partitionValues = partitionValues; | ||
| this.preInitializedReaders = preInitializedReaders; | ||
|
|
||
| // Call parent init method | ||
| super.init(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Legacy Date Timestamp Not Honored in InitThe
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. value:useful; category:bug; feedback:The AI reviewer is correct that the passed function argument was not stored and this way IcebergCometNativeBatchReader will always use the default value of |
||
| } | ||
|
Comment on lines
+58
to
+74
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Propagate legacy timestamp handling (and metrics) into the parent state
Please wire both through before calling this.partitionSchema = partitionSchema;
this.partitionValues = partitionValues;
- this.preInitializedReaders = preInitializedReaders;
+ this.useLegacyDateTimestamp = useLegacyDateTimestamp;
+ this.preInitializedReaders = preInitializedReaders;
+ this.metrics.clear();
+ if (metrics != null) {
+ this.metrics.putAll(metrics);
+ }🤖 Prompt for AI Agents
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. value:useful; category:bug; feedback:The AI reviewer is correct that the passed function argument was not stored and this way IcebergCometNativeBatchReader will always use the default value of false(https://github.com/parthchandra/datafusion-comet/blob/d73bcbab9f80836d7229207f309283942501e9ab/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java) and this may lead to wrongly parsed dates and timestamps. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @martin-augment Thank you for confirming the analysis and providing the detailed feedback! I'm glad the review caught both the The suggested fix will ensure that both fields are properly propagated to the parent class before |
||
|
|
||
| public StructType getSparkSchema() { | ||
| return this.sparkSchema; | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The class Javadoc says ParquetMetadata is passed as a JSON string, but this API takes raw bytes and uses Parquet’s serializer; consider updating the comment to say “serialized bytes” instead of “JSON string.”
🤖 React with 👍 or 👎 to let us know if the comment was useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value:useful; category:documentation; feedback:The AI reviewer is correct that the Javadoc documentation is not correct. https://github.com/apache/datafusion-comet/pull/2680/files#diff-e57878f6cd8036999500de5719f8f4bbe28e1ed5dcb79a02ad7d7eb206f37473R44 shows that the input is provided as a byte array, not a String. The would prevent confusion in the developer how to use this new API.