Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Spark-4.0 widening type support #604

Merged
merged 12 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/src/main/java/org/apache/comet/parquet/Native.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public static native long initColumnReader(
int maxDl,
int maxRl,
int bitWidth,
int expectedBitWidth,
boolean isSigned,
int typeLength,
int precision,
Expand Down
43 changes: 37 additions & 6 deletions common/src/main/java/org/apache/comet/parquet/TypeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.parquet.schema.Types;
import org.apache.spark.package$;
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.*;

import org.apache.comet.CometConf;
Expand Down Expand Up @@ -57,7 +58,9 @@ public static ColumnDescriptor convertToParquet(StructField field) {
if (type == DataTypes.BooleanType || type == DataTypes.NullType) {
builder = Types.primitive(PrimitiveType.PrimitiveTypeName.BOOLEAN, repetition);
} else if (type == DataTypes.IntegerType || type instanceof YearMonthIntervalType) {
builder = Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition);
builder =
Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
.as(LogicalTypeAnnotation.intType(32, true));
} else if (type == DataTypes.DateType) {
builder =
Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
Expand Down Expand Up @@ -148,6 +151,12 @@ && isUnsignedIntTypeMatched(logicalTypeAnnotation, 32)) {
return;
} else if (sparkType instanceof YearMonthIntervalType) {
return;
} else if (sparkType == DataTypes.DoubleType && isSpark40Plus()) {
return;
} else if (sparkType == TimestampNTZType$.MODULE$
&& isSpark40Plus()
&& logicalTypeAnnotation instanceof DateLogicalTypeAnnotation) {
return;
}
break;
case INT64:
Expand All @@ -159,11 +168,13 @@ && isUnsignedIntTypeMatched(logicalTypeAnnotation, 64)) {
// For unsigned int64, it stores as plain signed int64 in Parquet when dictionary
// fallbacks. We read them as decimal values.
return;
} else if (isTimestampTypeMatched(logicalTypeAnnotation, TimeUnit.MICROS)) {
} else if (isTimestampTypeMatched(logicalTypeAnnotation, TimeUnit.MICROS)
&& (sparkType == TimestampNTZType$.MODULE$ || sparkType == DataTypes.TimestampType)) {
validateTimestampType(logicalTypeAnnotation, sparkType);
// TODO: use dateTimeRebaseMode from Spark side
return;
} else if (isTimestampTypeMatched(logicalTypeAnnotation, TimeUnit.MILLIS)) {
} else if (isTimestampTypeMatched(logicalTypeAnnotation, TimeUnit.MILLIS)
&& (sparkType == TimestampNTZType$.MODULE$ || sparkType == DataTypes.TimestampType)) {
validateTimestampType(logicalTypeAnnotation, sparkType);
return;
}
Expand Down Expand Up @@ -266,9 +277,29 @@ private static boolean isDecimalTypeMatched(ColumnDescriptor descriptor, DataTyp
DecimalLogicalTypeAnnotation decimalType = (DecimalLogicalTypeAnnotation) typeAnnotation;
// It's OK if the required decimal precision is larger than or equal to the physical decimal
// precision in the Parquet metadata, as long as the decimal scale is the same.
return decimalType.getPrecision() <= d.precision()
&& (decimalType.getScale() == d.scale()
|| (isSpark40Plus() && decimalType.getScale() <= d.scale()));
return (decimalType.getPrecision() <= d.precision() && decimalType.getScale() == d.scale())
|| (isSpark40Plus()
&& (!SQLConf.get().parquetVectorizedReaderEnabled()
|| (decimalType.getScale() <= d.scale()
&& decimalType.getPrecision() - decimalType.getScale()
<= d.precision() - d.scale())));
} else if (isSpark40Plus()) {
boolean isNullTypeAnnotation = typeAnnotation == null;
boolean isIntTypeAnnotation = typeAnnotation instanceof IntLogicalTypeAnnotation;
if (!SQLConf.get().parquetVectorizedReaderEnabled()) {
return isNullTypeAnnotation || isIntTypeAnnotation;
} else if (isNullTypeAnnotation
|| (isIntTypeAnnotation && ((IntLogicalTypeAnnotation) typeAnnotation).isSigned())) {
PrimitiveType.PrimitiveTypeName typeName =
descriptor.getPrimitiveType().getPrimitiveTypeName();
int integerPrecision = d.precision() - d.scale();
switch (typeName) {
case INT32:
return integerPrecision >= DecimalType$.MODULE$.IntDecimal().precision();
case INT64:
return integerPrecision >= DecimalType$.MODULE$.LongDecimal().precision();
}
}
}
return false;
}
Expand Down
15 changes: 13 additions & 2 deletions common/src/main/java/org/apache/comet/parquet/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public static long initColumnReader(
promotionInfo = new TypePromotionInfo(readType);
} else {
// If type promotion is not enable, we'll just use the Parquet primitive type and precision.
promotionInfo = new TypePromotionInfo(primitiveTypeId, precision, scale);
promotionInfo = new TypePromotionInfo(primitiveTypeId, precision, scale, bitWidth);
}

return Native.initColumnReader(
Expand All @@ -126,6 +126,7 @@ public static long initColumnReader(
descriptor.getMaxDefinitionLevel(),
descriptor.getMaxRepetitionLevel(),
bitWidth,
promotionInfo.bitWidth,
isSigned,
primitiveType.getTypeLength(),
precision,
Expand All @@ -147,11 +148,14 @@ static class TypePromotionInfo {
int precision;
// Decimal scale from the Spark read schema, or -1 if it's not decimal type.
int scale;
// Integer bit width from the Spark read schema, or -1 if it's not integer type.
int bitWidth;

TypePromotionInfo(int physicalTypeId, int precision, int scale) {
TypePromotionInfo(int physicalTypeId, int precision, int scale, int bitWidth) {
this.physicalTypeId = physicalTypeId;
this.precision = precision;
this.scale = scale;
this.bitWidth = bitWidth;
}

TypePromotionInfo(DataType sparkReadType) {
Expand All @@ -164,15 +168,22 @@ static class TypePromotionInfo {
LogicalTypeAnnotation annotation = primitiveType.getLogicalTypeAnnotation();
int precision = -1;
int scale = -1;
int bitWidth = -1;
if (annotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalAnnotation =
(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) annotation;
precision = decimalAnnotation.getPrecision();
scale = decimalAnnotation.getScale();
}
if (annotation instanceof LogicalTypeAnnotation.IntLogicalTypeAnnotation) {
LogicalTypeAnnotation.IntLogicalTypeAnnotation intAnnotation =
(LogicalTypeAnnotation.IntLogicalTypeAnnotation) annotation;
bitWidth = intAnnotation.getBitWidth();
}
this.physicalTypeId = physicalTypeId;
this.precision = precision;
this.scale = scale;
this.bitWidth = bitWidth;
}
}

Expand Down
49 changes: 30 additions & 19 deletions dev/diffs/4.0.0-preview1.diff
Original file line number Diff line number Diff line change
Expand Up @@ -2012,28 +2012,39 @@ index 25f6af1cc33..37b40cb5524 100644
val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = false)
val expectedMessage = "Encountered error while reading file"
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
index 4bd35e0789b..6bfedb65078 100644
index 4bd35e0789b..6544d86dbe0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
@@ -24,7 +24,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat}

import org.apache.spark.SparkException
-import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.{DataFrame, IgnoreCometSuite, QueryTest, Row}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
import org.apache.spark.sql.functions.col
@@ -38,7 +38,8 @@ class ParquetTypeWideningSuite
extends QueryTest
with ParquetTest
with SharedSparkSession
- with AdaptiveSparkPlanHelper {
+ with AdaptiveSparkPlanHelper
+ with IgnoreCometSuite { // TODO: https://github.com/apache/datafusion-comet/issues/551

import testImplicits._
@@ -65,7 +65,9 @@ class ParquetTypeWideningSuite
withClue(
s"with dictionary encoding '$dictionaryEnabled' with timestamp rebase mode " +
s"'$timestampRebaseMode''") {
- withAllParquetWriters {
+ // TODO: Comet cannot read DELTA_BINARY_PACKED created by V2 writer
+ // https://github.com/apache/datafusion-comet/issues/574
+ // withAllParquetWriters {
withTempDir { dir =>
val expected =
writeParquetFiles(dir, values, fromType, dictionaryEnabled, timestampRebaseMode)
@@ -86,7 +88,7 @@ class ParquetTypeWideningSuite
}
}
}
- }
+ // }
}
}

@@ -190,7 +192,8 @@ class ParquetTypeWideningSuite
(Seq("1", "2", Short.MinValue.toString), ShortType, DoubleType),
(Seq("1", "2", Int.MinValue.toString), IntegerType, DoubleType),
(Seq("1.23", "10.34"), FloatType, DoubleType),
- (Seq("2020-01-01", "2020-01-02", "1312-02-27"), DateType, TimestampNTZType)
+ // TODO: Comet cannot handle older than "1582-10-15"
+ (Seq("2020-01-01", "2020-01-02"/* , "1312-02-27" */), DateType, TimestampNTZType)
)
}
test(s"parquet widening conversion $fromType -> $toType") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
index b8f3ea3c6f3..bbd44221288 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
Expand Down
4 changes: 2 additions & 2 deletions native/core/benches/parquet_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ fn bench(c: &mut Criterion) {
);
b.iter(|| {
let cd = ColumnDescriptor::new(t.clone(), 0, 0, ColumnPath::from(Vec::new()));
let promition_info = TypePromotionInfo::new(PhysicalType::INT32, -1, -1);
let promotion_info = TypePromotionInfo::new(PhysicalType::INT32, -1, -1, 32);
let mut column_reader = TestColumnReader::new(
cd,
promition_info,
promotion_info,
BATCH_SIZE,
pages.clone(),
expected_num_values,
Expand Down
5 changes: 5 additions & 0 deletions native/core/src/parquet/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,15 @@ make_type!(BoolType);
make_type!(Int8Type);
make_type!(UInt8Type);
make_type!(Int16Type);
make_type!(Int16ToDoubleType);
make_type!(UInt16Type);
make_type!(Int32Type);
make_type!(Int32To64Type);
make_type!(Int32ToDecimal64Type);
make_type!(Int32ToDoubleType);
make_type!(UInt32Type);
make_type!(Int64Type);
make_type!(Int64ToDecimal64Type);
make_type!(UInt64Type);
make_type!(FloatType);
make_type!(DoubleType);
Expand All @@ -48,6 +52,7 @@ make_type!(FLBADecimal32Type);
make_type!(FLBADecimal64Type);
make_type!(FLBAType);
make_type!(Int32DateType);
make_type!(Int32TimestampMicrosType);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we have Int32TimestampMillisType as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have a usage for mills for now I think

make_type!(Int64TimestampMillisType);
make_type!(Int64TimestampMicrosType);
make_type!(Int96TimestampMicrosType);
Expand Down
9 changes: 7 additions & 2 deletions native/core/src/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub extern "system" fn Java_org_apache_comet_parquet_Native_initColumnReader(
max_dl: jint,
max_rl: jint,
bit_width: jint,
read_bit_width: jint,
is_signed: jboolean,
type_length: jint,
precision: jint,
Expand Down Expand Up @@ -95,8 +96,12 @@ pub extern "system" fn Java_org_apache_comet_parquet_Native_initColumnReader(
is_adjusted_utc,
jni_path,
)?;
let promotion_info =
TypePromotionInfo::new_from_jni(read_primitive_type, read_precision, read_scale);
let promotion_info = TypePromotionInfo::new_from_jni(
read_primitive_type,
read_precision,
read_scale,
read_bit_width,
);
let ctx = Context {
column_reader: ColumnReader::get(
desc,
Expand Down
Loading
Loading