Skip to content

Commit ab186e3

Browse files
xuanyuankingcloud-fan
authored andcommitted
[SPARK-25829][SQL] Add config spark.sql.legacy.allowDuplicatedMapKeys and change the default behavior
### What changes were proposed in this pull request? This is a follow-up for #23124, add a new config `spark.sql.legacy.allowDuplicatedMapKeys` to control the behavior of removing duplicated map keys in build-in functions. With the default value `false`, Spark will throw a RuntimeException while duplicated keys are found. ### Why are the changes needed? Prevent silent behavior changes. ### Does this PR introduce any user-facing change? Yes, new config added and the default behavior for duplicated map keys changed to RuntimeException thrown. ### How was this patch tested? Modify existing UT. Closes #27478 from xuanyuanking/SPARK-25892-follow. Authored-by: Yuanjian Li <xyliyuanjian@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 5b87342 commit ab186e3

File tree

10 files changed

+136
-80
lines changed

10 files changed

+136
-80
lines changed

docs/sql-migration-guide.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ license: |
4949

5050
- In Spark version 2.4 and earlier, float/double -0.0 is semantically equal to 0.0, but -0.0 and 0.0 are considered as different values when used in aggregate grouping keys, window partition keys and join keys. Since Spark 3.0, this bug is fixed. For example, `Seq(-0.0, 0.0).toDF("d").groupBy("d").count()` returns `[(0.0, 2)]` in Spark 3.0, and `[(0.0, 1), (-0.0, 1)]` in Spark 2.4 and earlier.
5151

52-
- In Spark version 2.4 and earlier, users can create a map with duplicated keys via built-in functions like `CreateMap`, `StringToMap`, etc. The behavior of map with duplicated keys is undefined, e.g. map look up respects the duplicated key appears first, `Dataset.collect` only keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc. Since Spark 3.0, these built-in functions will remove duplicated map keys with last wins policy. Users may still read map values with duplicated keys from data sources which do not enforce it (e.g. Parquet), the behavior will be undefined.
52+
- In Spark version 2.4 and earlier, users can create a map with duplicated keys via built-in functions like `CreateMap`, `StringToMap`, etc. The behavior of map with duplicated keys is undefined, e.g. map look up respects the duplicated key appears first, `Dataset.collect` only keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc. Since Spark 3.0, new config `spark.sql.legacy.allowDuplicatedMapKeys` was added, with the default value `false`, Spark will throw RuntimeException while duplicated keys are found. If set to `true`, these built-in functions will remove duplicated map keys with last wins policy. Users may still read map values with duplicated keys from data sources which do not enforce it (e.g. Parquet), the behavior will be undefined.
5353

5454
- In Spark version 2.4 and earlier, partition column value is converted as null if it can't be casted to corresponding user provided schema. Since 3.0, partition column value is validated with user provided schema. An exception is thrown if the validation fails. You can disable such validation by setting `spark.sql.sources.validatePartitionColumns` to `false`.
5555

python/pyspark/sql/functions.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2766,12 +2766,12 @@ def map_concat(*cols):
27662766
:param cols: list of column names (string) or list of :class:`Column` expressions
27672767
27682768
>>> from pyspark.sql.functions import map_concat
2769-
>>> df = spark.sql("SELECT map(1, 'a', 2, 'b') as map1, map(3, 'c', 1, 'd') as map2")
2769+
>>> df = spark.sql("SELECT map(1, 'a', 2, 'b') as map1, map(3, 'c') as map2")
27702770
>>> df.select(map_concat("map1", "map2").alias("map3")).show(truncate=False)
27712771
+------------------------+
27722772
|map3 |
27732773
+------------------------+
2774-
|[1 -> d, 2 -> b, 3 -> c]|
2774+
|[1 -> a, 2 -> b, 3 -> c]|
27752775
+------------------------+
27762776
"""
27772777
sc = SparkContext._active_spark_context

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -516,8 +516,8 @@ case class MapEntries(child: Expression) extends UnaryExpression with ExpectsInp
516516
usage = "_FUNC_(map, ...) - Returns the union of all the given maps",
517517
examples = """
518518
Examples:
519-
> SELECT _FUNC_(map(1, 'a', 2, 'b'), map(2, 'c', 3, 'd'));
520-
{1:"a",2:"c",3:"d"}
519+
> SELECT _FUNC_(map(1, 'a', 2, 'b'), map(3, 'c'));
520+
{1:"a",2:"b",3:"c"}
521521
""", since = "2.4.0")
522522
case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpression {
523523

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ package org.apache.spark.sql.catalyst.util
2020
import scala.collection.mutable
2121

2222
import org.apache.spark.sql.catalyst.InternalRow
23+
import org.apache.spark.sql.internal.SQLConf
24+
import org.apache.spark.sql.internal.SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY
2325
import org.apache.spark.sql.types._
2426
import org.apache.spark.unsafe.array.ByteArrayMethods
2527

@@ -47,6 +49,9 @@ class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Seria
4749
private lazy val keyGetter = InternalRow.getAccessor(keyType)
4850
private lazy val valueGetter = InternalRow.getAccessor(valueType)
4951

52+
private val allowDuplicatedMapKey =
53+
SQLConf.get.getConf(LEGACY_ALLOW_DUPLICATED_MAP_KEY)
54+
5055
def put(key: Any, value: Any): Unit = {
5156
if (key == null) {
5257
throw new RuntimeException("Cannot use null as map key.")
@@ -62,6 +67,11 @@ class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Seria
6267
keys.append(key)
6368
values.append(value)
6469
} else {
70+
if (!allowDuplicatedMapKey) {
71+
throw new RuntimeException(s"Duplicate map key $key was founded, please check the input " +
72+
"data. If you want to remove the duplicated keys with last-win policy, you can set " +
73+
s"${LEGACY_ALLOW_DUPLICATED_MAP_KEY.key} to true.")
74+
}
6575
// Overwrite the previous value, as the policy is last wins.
6676
values(index) = value
6777
}

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2188,6 +2188,16 @@ object SQLConf {
21882188
.booleanConf
21892189
.createWithDefault(false)
21902190

2191+
val LEGACY_ALLOW_DUPLICATED_MAP_KEY =
2192+
buildConf("spark.sql.legacy.allowDuplicatedMapKeys")
2193+
.doc("When true, use last wins policy to remove duplicated map keys in built-in functions, " +
2194+
"this config takes effect in below build-in functions: CreateMap, MapFromArrays, " +
2195+
"MapFromEntries, StringToMap, MapConcat and TransformKeys. Otherwise, if this is false, " +
2196+
"which is the default, Spark will throw an exception when duplicated map keys are " +
2197+
"detected.")
2198+
.booleanConf
2199+
.createWithDefault(false)
2200+
21912201
/**
21922202
* Holds information about keys that have been deprecated.
21932203
*

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,10 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper
139139
MapType(IntegerType, IntegerType, valueContainsNull = true))
140140
val mNull = Literal.create(null, MapType(StringType, StringType))
141141

142-
// overlapping maps should remove duplicated map keys w.r.t. last win policy.
143-
checkEvaluation(MapConcat(Seq(m0, m1)), create_map("a" -> "4", "b" -> "2", "c" -> "3"))
142+
withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
143+
// overlapping maps should remove duplicated map keys w.r.t. last win policy.
144+
checkEvaluation(MapConcat(Seq(m0, m1)), create_map("a" -> "4", "b" -> "2", "c" -> "3"))
145+
}
144146

145147
// maps with no overlap
146148
checkEvaluation(MapConcat(Seq(m0, m2)),
@@ -272,8 +274,10 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper
272274
checkEvaluation(MapFromEntries(ai1), create_map(1 -> null, 2 -> 20, 3 -> null))
273275
checkEvaluation(MapFromEntries(ai2), Map.empty)
274276
checkEvaluation(MapFromEntries(ai3), null)
275-
// Duplicated map keys will be removed w.r.t. the last wins policy.
276-
checkEvaluation(MapFromEntries(ai4), create_map(1 -> 20))
277+
withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
278+
// Duplicated map keys will be removed w.r.t. the last wins policy.
279+
checkEvaluation(MapFromEntries(ai4), create_map(1 -> 20))
280+
}
277281
// Map key can't be null
278282
checkExceptionInExpression[RuntimeException](
279283
MapFromEntries(ai5),
@@ -294,8 +298,10 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper
294298
checkEvaluation(MapFromEntries(as1), create_map("a" -> null, "b" -> "bb", "c" -> null))
295299
checkEvaluation(MapFromEntries(as2), Map.empty)
296300
checkEvaluation(MapFromEntries(as3), null)
297-
// Duplicated map keys will be removed w.r.t. the last wins policy.
298-
checkEvaluation(MapFromEntries(as4), create_map("a" -> "bb"))
301+
withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
302+
// Duplicated map keys will be removed w.r.t. the last wins policy.
303+
checkEvaluation(MapFromEntries(as4), create_map("a" -> "bb"))
304+
}
299305
// Map key can't be null
300306
checkExceptionInExpression[RuntimeException](
301307
MapFromEntries(as5),

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import org.apache.spark.SparkFunSuite
2121
import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, UnresolvedExtractValue}
2222
import org.apache.spark.sql.catalyst.dsl.expressions._
2323
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
24+
import org.apache.spark.sql.internal.SQLConf
2425
import org.apache.spark.sql.types._
2526
import org.apache.spark.unsafe.types.UTF8String
2627

@@ -216,10 +217,12 @@ class ComplexTypeSuite extends SparkFunSuite with ExpressionEvalHelper {
216217
CreateMap(interlace(strWithNull, intSeq.map(Literal(_)))),
217218
"Cannot use null as map key")
218219

219-
// Duplicated map keys will be removed w.r.t. the last wins policy.
220-
checkEvaluation(
221-
CreateMap(Seq(Literal(1), Literal(2), Literal(1), Literal(3))),
222-
create_map(1 -> 3))
220+
withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
221+
// Duplicated map keys will be removed w.r.t. the last wins policy.
222+
checkEvaluation(
223+
CreateMap(Seq(Literal(1), Literal(2), Literal(1), Literal(3))),
224+
create_map(1 -> 3))
225+
}
223226

224227
// ArrayType map key and value
225228
val map = CreateMap(Seq(
@@ -281,12 +284,14 @@ class ComplexTypeSuite extends SparkFunSuite with ExpressionEvalHelper {
281284
MapFromArrays(intWithNullArray, strArray),
282285
"Cannot use null as map key")
283286

284-
// Duplicated map keys will be removed w.r.t. the last wins policy.
285-
checkEvaluation(
286-
MapFromArrays(
287-
Literal.create(Seq(1, 1), ArrayType(IntegerType)),
288-
Literal.create(Seq(2, 3), ArrayType(IntegerType))),
289-
create_map(1 -> 3))
287+
withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
288+
// Duplicated map keys will be removed w.r.t. the last wins policy.
289+
checkEvaluation(
290+
MapFromArrays(
291+
Literal.create(Seq(1, 1), ArrayType(IntegerType)),
292+
Literal.create(Seq(2, 3), ArrayType(IntegerType))),
293+
create_map(1 -> 3))
294+
}
290295

291296
// map key can't be map
292297
val arrayOfMap = Seq(create_map(1 -> "a", 2 -> "b"))
@@ -399,10 +404,12 @@ class ComplexTypeSuite extends SparkFunSuite with ExpressionEvalHelper {
399404
val m5 = Map("a" -> null)
400405
checkEvaluation(new StringToMap(s5), m5)
401406

402-
// Duplicated map keys will be removed w.r.t. the last wins policy.
403-
checkEvaluation(
404-
new StringToMap(Literal("a:1,b:2,a:3")),
405-
create_map("a" -> "3", "b" -> "2"))
407+
withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
408+
// Duplicated map keys will be removed w.r.t. the last wins policy.
409+
checkEvaluation(
410+
new StringToMap(Literal("a:1,b:2,a:3")),
411+
create_map("a" -> "3", "b" -> "2"))
412+
}
406413

407414
// arguments checking
408415
assert(new StringToMap(Literal("a:1,b:2,c:3")).checkInputDataTypes().isSuccess)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HigherOrderFunctionsSuite.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -465,8 +465,10 @@ class HigherOrderFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper
465465
checkEvaluation(
466466
transformKeys(transformKeys(ai0, plusOne), plusValue),
467467
create_map(3 -> 1, 5 -> 2, 7 -> 3, 9 -> 4))
468-
// Duplicated map keys will be removed w.r.t. the last wins policy.
469-
checkEvaluation(transformKeys(ai0, modKey), create_map(1 -> 4, 2 -> 2, 0 -> 3))
468+
withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
469+
// Duplicated map keys will be removed w.r.t. the last wins policy.
470+
checkEvaluation(transformKeys(ai0, modKey), create_map(1 -> 4, 2 -> 2, 0 -> 3))
471+
}
470472
checkEvaluation(transformKeys(ai1, plusOne), Map.empty[Int, Int])
471473
checkEvaluation(transformKeys(ai1, plusOne), Map.empty[Int, Int])
472474
checkEvaluation(

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilderSuite.scala

Lines changed: 64 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@ package org.apache.spark.sql.catalyst.util
2020
import org.apache.spark.SparkFunSuite
2121
import org.apache.spark.sql.catalyst.InternalRow
2222
import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, UnsafeRow}
23+
import org.apache.spark.sql.catalyst.plans.SQLHelper
24+
import org.apache.spark.sql.internal.SQLConf
2325
import org.apache.spark.sql.types.{ArrayType, BinaryType, IntegerType, StructType}
2426
import org.apache.spark.unsafe.Platform
2527

26-
class ArrayBasedMapBuilderSuite extends SparkFunSuite {
28+
class ArrayBasedMapBuilderSuite extends SparkFunSuite with SQLHelper {
2729

2830
test("basic") {
2931
val builder = new ArrayBasedMapBuilder(IntegerType, IntegerType)
@@ -42,64 +44,79 @@ class ArrayBasedMapBuilderSuite extends SparkFunSuite {
4244
assert(e.getMessage.contains("Cannot use null as map key"))
4345
}
4446

45-
test("remove duplicated keys with last wins policy") {
47+
test("fail while duplicated keys detected") {
4648
val builder = new ArrayBasedMapBuilder(IntegerType, IntegerType)
4749
builder.put(1, 1)
48-
builder.put(2, 2)
49-
builder.put(1, 2)
50-
val map = builder.build()
51-
assert(map.numElements() == 2)
52-
assert(ArrayBasedMapData.toScalaMap(map) == Map(1 -> 2, 2 -> 2))
50+
val e = intercept[RuntimeException](builder.put(1, 2))
51+
assert(e.getMessage.contains("Duplicate map key 1 was founded"))
52+
}
53+
54+
test("remove duplicated keys with last wins policy") {
55+
withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
56+
val builder = new ArrayBasedMapBuilder(IntegerType, IntegerType)
57+
builder.put(1, 1)
58+
builder.put(2, 2)
59+
builder.put(1, 2)
60+
val map = builder.build()
61+
assert(map.numElements() == 2)
62+
assert(ArrayBasedMapData.toScalaMap(map) == Map(1 -> 2, 2 -> 2))
63+
}
5364
}
5465

5566
test("binary type key") {
56-
val builder = new ArrayBasedMapBuilder(BinaryType, IntegerType)
57-
builder.put(Array(1.toByte), 1)
58-
builder.put(Array(2.toByte), 2)
59-
builder.put(Array(1.toByte), 3)
60-
val map = builder.build()
61-
assert(map.numElements() == 2)
62-
val entries = ArrayBasedMapData.toScalaMap(map).iterator.toSeq
63-
assert(entries(0)._1.asInstanceOf[Array[Byte]].toSeq == Seq(1))
64-
assert(entries(0)._2 == 3)
65-
assert(entries(1)._1.asInstanceOf[Array[Byte]].toSeq == Seq(2))
66-
assert(entries(1)._2 == 2)
67+
withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
68+
val builder = new ArrayBasedMapBuilder(BinaryType, IntegerType)
69+
builder.put(Array(1.toByte), 1)
70+
builder.put(Array(2.toByte), 2)
71+
builder.put(Array(1.toByte), 3)
72+
val map = builder.build()
73+
assert(map.numElements() == 2)
74+
val entries = ArrayBasedMapData.toScalaMap(map).iterator.toSeq
75+
assert(entries(0)._1.asInstanceOf[Array[Byte]].toSeq == Seq(1))
76+
assert(entries(0)._2 == 3)
77+
assert(entries(1)._1.asInstanceOf[Array[Byte]].toSeq == Seq(2))
78+
assert(entries(1)._2 == 2)
79+
}
6780
}
6881

6982
test("struct type key") {
70-
val builder = new ArrayBasedMapBuilder(new StructType().add("i", "int"), IntegerType)
71-
builder.put(InternalRow(1), 1)
72-
builder.put(InternalRow(2), 2)
73-
val unsafeRow = {
74-
val row = new UnsafeRow(1)
75-
val bytes = new Array[Byte](16)
76-
row.pointTo(bytes, 16)
77-
row.setInt(0, 1)
78-
row
83+
withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
84+
val builder = new ArrayBasedMapBuilder(new StructType().add("i", "int"), IntegerType)
85+
builder.put(InternalRow(1), 1)
86+
builder.put(InternalRow(2), 2)
87+
val unsafeRow = {
88+
val row = new UnsafeRow(1)
89+
val bytes = new Array[Byte](16)
90+
row.pointTo(bytes, 16)
91+
row.setInt(0, 1)
92+
row
93+
}
94+
builder.put(unsafeRow, 3)
95+
val map = builder.build()
96+
assert(map.numElements() == 2)
97+
assert(ArrayBasedMapData.toScalaMap(map) == Map(InternalRow(1) -> 3, InternalRow(2) -> 2))
7998
}
80-
builder.put(unsafeRow, 3)
81-
val map = builder.build()
82-
assert(map.numElements() == 2)
83-
assert(ArrayBasedMapData.toScalaMap(map) == Map(InternalRow(1) -> 3, InternalRow(2) -> 2))
8499
}
85100

86101
test("array type key") {
87-
val builder = new ArrayBasedMapBuilder(ArrayType(IntegerType), IntegerType)
88-
builder.put(new GenericArrayData(Seq(1, 1)), 1)
89-
builder.put(new GenericArrayData(Seq(2, 2)), 2)
90-
val unsafeArray = {
91-
val array = new UnsafeArrayData()
92-
val bytes = new Array[Byte](24)
93-
Platform.putLong(bytes, Platform.BYTE_ARRAY_OFFSET, 2)
94-
array.pointTo(bytes, Platform.BYTE_ARRAY_OFFSET, 24)
95-
array.setInt(0, 1)
96-
array.setInt(1, 1)
97-
array
102+
withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
103+
val builder = new ArrayBasedMapBuilder(ArrayType(IntegerType), IntegerType)
104+
builder.put(new GenericArrayData(Seq(1, 1)), 1)
105+
builder.put(new GenericArrayData(Seq(2, 2)), 2)
106+
val unsafeArray = {
107+
val array = new UnsafeArrayData()
108+
val bytes = new Array[Byte](24)
109+
Platform.putLong(bytes, Platform.BYTE_ARRAY_OFFSET, 2)
110+
array.pointTo(bytes, Platform.BYTE_ARRAY_OFFSET, 24)
111+
array.setInt(0, 1)
112+
array.setInt(1, 1)
113+
array
114+
}
115+
builder.put(unsafeArray, 3)
116+
val map = builder.build()
117+
assert(map.numElements() == 2)
118+
assert(ArrayBasedMapData.toScalaMap(map) ==
119+
Map(new GenericArrayData(Seq(1, 1)) -> 3, new GenericArrayData(Seq(2, 2)) -> 2))
98120
}
99-
builder.put(unsafeArray, 3)
100-
val map = builder.build()
101-
assert(map.numElements() == 2)
102-
assert(ArrayBasedMapData.toScalaMap(map) ==
103-
Map(new GenericArrayData(Seq(1, 1)) -> 3, new GenericArrayData(Seq(2, 2)) -> 2))
104121
}
105122
}

sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -651,8 +651,10 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
651651
Row(null)
652652
)
653653

654-
checkAnswer(df1.selectExpr("map_concat(map1, map2)"), expected1a)
655-
checkAnswer(df1.select(map_concat($"map1", $"map2")), expected1a)
654+
withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
655+
checkAnswer(df1.selectExpr("map_concat(map1, map2)"), expected1a)
656+
checkAnswer(df1.select(map_concat($"map1", $"map2")), expected1a)
657+
}
656658

657659
val expected1b = Seq(
658660
Row(Map(1 -> 100, 2 -> 200)),
@@ -3068,11 +3070,13 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
30683070
checkAnswer(dfExample2.select(transform_keys(col("j"), (k, v) => k + v)),
30693071
Seq(Row(Map(2.0 -> 1.0, 3.4 -> 1.4, 4.7 -> 1.7))))
30703072

3071-
checkAnswer(dfExample3.selectExpr("transform_keys(x, (k, v) -> k % 2 = 0 OR v)"),
3072-
Seq(Row(Map(true -> true, true -> false))))
3073+
withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
3074+
checkAnswer(dfExample3.selectExpr("transform_keys(x, (k, v) -> k % 2 = 0 OR v)"),
3075+
Seq(Row(Map(true -> true, true -> false))))
30733076

3074-
checkAnswer(dfExample3.select(transform_keys(col("x"), (k, v) => k % 2 === 0 || v)),
3075-
Seq(Row(Map(true -> true, true -> false))))
3077+
checkAnswer(dfExample3.select(transform_keys(col("x"), (k, v) => k % 2 === 0 || v)),
3078+
Seq(Row(Map(true -> true, true -> false))))
3079+
}
30763080

30773081
checkAnswer(dfExample3.selectExpr("transform_keys(x, (k, v) -> if(v, 2 * k, 3 * k))"),
30783082
Seq(Row(Map(50 -> true, 78 -> false))))

0 commit comments

Comments
 (0)