-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-47563][SQL] Add map normalization on creation #45721
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
case FloatType => NormalizeFloatingNumbers.FLOAT_NORMALIZER(value) | ||
case DoubleType => NormalizeFloatingNumbers.DOUBLE_NORMALIZER(value) | ||
case ArrayType(dt, _) => | ||
new GenericArrayData(value.asInstanceOf[GenericArrayData].array.map { element => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we have an array of 1 million strings we will go through each value even though we know we don't need to normalize strings
what about doing the same as in NormalizeFloatingNumbers
and first check if we need to perform normalization
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Applied NormalizeFloatingNumbers.needNormalize
here
"mapKeyDedupPolicy" -> "\"spark.sql.mapKeyDedupPolicy\"") | ||
) | ||
|
||
val builderStruct = new ArrayBasedMapBuilder(new StructType().add("i", "double"), IntegerType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe add a case when array is inside of a struct
@@ -60,6 +60,40 @@ class ArrayBasedMapBuilderSuite extends SparkFunSuite with SQLHelper { | |||
) | |||
} | |||
|
|||
test("apply key normalization when creating") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add another test for successful normalization
please add info in the description on why the change is needed, ie right now we can create a map with both keys -0 and 0 etc |
case StructType(sf) => | ||
new GenericInternalRow( | ||
value.asInstanceOf[GenericInternalRow].values.zipWithIndex.map { element => | ||
normalize(element._1, sf(element._2).dataType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you could also check if you need to do normalization here right?
this way we would avoid normalization of all fields of a struct if only one actually needs it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As noted by @cloud-fan below, complex types have been dropped.
@@ -52,18 +54,36 @@ class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Seria | |||
|
|||
private val mapKeyDedupPolicy = SQLConf.get.getConf(SQLConf.MAP_KEY_DEDUP_POLICY) | |||
|
|||
private lazy val keyNeedNormalize = NormalizeFloatingNumbers.needNormalize(keyType) | |||
|
|||
def normalize(value: Any, dataType: DataType): Any = dataType match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should return a lambda function to do normalization based on the data type, instead of matching the data type per row.
def normalize(value: Any, dataType: DataType): Any = dataType match { | ||
case FloatType => NormalizeFloatingNumbers.FLOAT_NORMALIZER(value) | ||
case DoubleType => NormalizeFloatingNumbers.DOUBLE_NORMALIZER(value) | ||
case ArrayType(dt, _) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to handle complex types, as we use TreeMap
for complex types which should handle floating points well.
private lazy val keyNeedNormalize = | ||
keyType.isInstanceOf[FloatType] || keyType.isInstanceOf[DoubleType] | ||
|
||
def normalize(dataType: DataType): Any => Any = dataType match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private lazy val keyNormalizer: Any => Any = keyType match {
case FloatType => NormalizeFloatingNumbers.FLOAT_NORMALIZER
case DoubleType => NormalizeFloatingNumbers.DOUBLE_NORMALIZER
case _ => identity
}
then the can just write
val keyNormalized = keyNormalizer(key)
thanks, merging to master! |
### What changes were proposed in this pull request? Added normalization of map keys when they are put in `ArrayBasedMapBuilder`. ### Why are the changes needed? As map keys need to be unique, we need to add normalization on floating point numbers and prevent the following case when building a map: `Map(0.0, -0.0)`. This further unblocks GROUP BY statement for Map Types as per [this discussion](apache#45549 (comment)). ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New UTs in `ArrayBasedMapBuilderSuite` ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#45721 from stevomitric/stevomitric/fix-map-dup. Authored-by: Stevo Mitric <stevo.mitric@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Added normalization of map keys when they are put in
ArrayBasedMapBuilder
.Why are the changes needed?
As map keys need to be unique, we need to add normalization on floating point numbers and prevent the following case when building a map:
Map(0.0, -0.0)
.This further unblocks GROUP BY statement for Map Types as per this discussion.
Does this PR introduce any user-facing change?
No
How was this patch tested?
New UTs in
ArrayBasedMapBuilderSuite
Was this patch authored or co-authored using generative AI tooling?
No