Skip to content

Commit fb5697d

Browse files
committed
[SPARK-48658][SQL] Encode/Decode functions report coding errors instead of mojibake for unmappable characters
### What changes were proposed in this pull request? This PR makes encode/decode functions report coding errors instead of mojibake for unmappable characters, take `select encode('渭城朝雨浥轻尘', 'US-ASCII')` as an example Before this PR, ```sql ??????? ``` After this PR, ```json org.apache.spark.SparkRuntimeException { "errorClass" : "MALFORMED_CHARACTER_CODING", "sqlState" : "22000", "messageParameters" : { "charset" : "US-ASCII", "function" : "`encode`" } } ``` ### Why are the changes needed? Improve data quality. ### Does this PR introduce _any_ user-facing change? Yes. When set spark.sql.legacy.codingErrorAction to true, encode/decode functions replace unmappable characters with mojibake instead of reporting coding errors. ### How was this patch tested? new unit tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #47017 from yaooqinn/SPARK-48658. Authored-by: Kent Yao <yao@apache.org> Signed-off-by: Kent Yao <yao@apache.org>
1 parent 8e02a64 commit fb5697d

File tree

18 files changed

+628
-134
lines changed

18 files changed

+628
-134
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3006,6 +3006,12 @@
30063006
],
30073007
"sqlState" : "42710"
30083008
},
3009+
"MALFORMED_CHARACTER_CODING" : {
3010+
"message" : [
3011+
"Invalid value found when performing <function> with <charset>"
3012+
],
3013+
"sqlState" : "22000"
3014+
},
30093015
"MALFORMED_CSV_RECORD" : {
30103016
"message" : [
30113017
"Malformed CSV record: <badRecord>"
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
Project [decode(cast(g#0 as binary), UTF-8, false) AS decode(g, UTF-8)#0]
1+
Project [staticinvoke(class org.apache.spark.sql.catalyst.expressions.StringDecode, StringType, decode, cast(g#0 as binary), UTF-8, false, false, BinaryType, StringTypeAnyCollation, BooleanType, BooleanType, true, true, true) AS decode(g, UTF-8)#0]
22
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
Project [encode(g#0, UTF-8, false) AS encode(g, UTF-8)#0]
1+
Project [staticinvoke(class org.apache.spark.sql.catalyst.expressions.Encode, BinaryType, encode, g#0, UTF-8, false, false, StringTypeAnyCollation, StringTypeAnyCollation, BooleanType, BooleanType, true, true, true) AS encode(g, UTF-8)#0]
22
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
Project [encode(g#0, UTF-8, false) AS to_binary(g, utf-8)#0]
1+
Project [staticinvoke(class org.apache.spark.sql.catalyst.expressions.Encode, BinaryType, encode, g#0, UTF-8, false, false, StringTypeAnyCollation, StringTypeAnyCollation, BooleanType, BooleanType, true, true, true) AS to_binary(g, utf-8)#0]
22
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala

Lines changed: 112 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,16 @@
1717

1818
package org.apache.spark.sql.catalyst.expressions
1919

20-
import java.io.UnsupportedEncodingException
20+
import java.nio.{ByteBuffer, CharBuffer}
21+
import java.nio.charset.{CharacterCodingException, Charset, CodingErrorAction, IllegalCharsetNameException, UnsupportedCharsetException}
2122
import java.text.{BreakIterator, DecimalFormat, DecimalFormatSymbols}
2223
import java.util.{Base64 => JBase64}
2324
import java.util.{HashMap, Locale, Map => JMap}
2425

2526
import scala.collection.mutable.ArrayBuffer
2627

2728
import org.apache.spark.QueryContext
29+
import org.apache.spark.network.util.JavaUtils
2830
import org.apache.spark.sql.catalyst.InternalRow
2931
import org.apache.spark.sql.catalyst.analysis.{ExpressionBuilder, FunctionRegistry, TypeCheckResult}
3032
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
@@ -2716,62 +2718,69 @@ case class Decode(params: Seq[Expression], replacement: Expression)
27162718
since = "1.5.0",
27172719
group = "string_funcs")
27182720
// scalastyle:on line.size.limit
2719-
case class StringDecode(bin: Expression, charset: Expression, legacyCharsets: Boolean)
2720-
extends BinaryExpression with ImplicitCastInputTypes with NullIntolerant {
2721+
case class StringDecode(
2722+
bin: Expression,
2723+
charset: Expression,
2724+
legacyCharsets: Boolean,
2725+
legacyErrorAction: Boolean)
2726+
extends RuntimeReplaceable with ImplicitCastInputTypes {
27212727

27222728
def this(bin: Expression, charset: Expression) =
2723-
this(bin, charset, SQLConf.get.legacyJavaCharsets)
2729+
this(bin, charset, SQLConf.get.legacyJavaCharsets, SQLConf.get.legacyCodingErrorAction)
27242730

2725-
override def left: Expression = bin
2726-
override def right: Expression = charset
27272731
override def dataType: DataType = SQLConf.get.defaultStringType
27282732
override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType, StringTypeAnyCollation)
2733+
override def prettyName: String = "decode"
2734+
override def toString: String = s"$prettyName($bin, $charset)"
27292735

2730-
private val supportedCharsets = Set(
2731-
"US-ASCII", "ISO-8859-1", "UTF-8", "UTF-16BE", "UTF-16LE", "UTF-16", "UTF-32")
2732-
2733-
protected override def nullSafeEval(input1: Any, input2: Any): Any = {
2734-
val fromCharset = input2.asInstanceOf[UTF8String].toString
2735-
try {
2736-
if (legacyCharsets || supportedCharsets.contains(fromCharset.toUpperCase(Locale.ROOT))) {
2737-
UTF8String.fromString(new String(input1.asInstanceOf[Array[Byte]], fromCharset))
2738-
} else throw new UnsupportedEncodingException
2739-
} catch {
2740-
case _: UnsupportedEncodingException =>
2741-
throw QueryExecutionErrors.invalidCharsetError(prettyName, fromCharset)
2742-
}
2743-
}
2744-
2745-
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
2746-
nullSafeCodeGen(ctx, ev, (bytes, charset) => {
2747-
val fromCharset = ctx.freshName("fromCharset")
2748-
val sc = JavaCode.global(
2749-
ctx.addReferenceObj("supportedCharsets", supportedCharsets),
2750-
supportedCharsets.getClass)
2751-
s"""
2752-
String $fromCharset = $charset.toString();
2753-
try {
2754-
if ($legacyCharsets || $sc.contains($fromCharset.toUpperCase(java.util.Locale.ROOT))) {
2755-
${ev.value} = UTF8String.fromString(new String($bytes, $fromCharset));
2756-
} else {
2757-
throw new java.io.UnsupportedEncodingException();
2758-
}
2759-
} catch (java.io.UnsupportedEncodingException e) {
2760-
throw QueryExecutionErrors.invalidCharsetError("$prettyName", $fromCharset);
2761-
}
2762-
"""
2763-
})
2764-
}
2765-
2766-
override protected def withNewChildrenInternal(
2767-
newLeft: Expression, newRight: Expression): StringDecode =
2768-
copy(bin = newLeft, charset = newRight)
2736+
override def replacement: Expression = StaticInvoke(
2737+
classOf[StringDecode],
2738+
SQLConf.get.defaultStringType,
2739+
"decode",
2740+
Seq(bin, charset, Literal(legacyCharsets), Literal(legacyErrorAction)),
2741+
Seq(BinaryType, StringTypeAnyCollation, BooleanType, BooleanType))
27692742

2770-
override def prettyName: String = "decode"
2743+
override def children: Seq[Expression] = Seq(bin, charset)
2744+
override protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression =
2745+
copy(bin = newChildren(0), charset = newChildren(1))
27712746
}
27722747

27732748
object StringDecode {
27742749
def apply(bin: Expression, charset: Expression): StringDecode = new StringDecode(bin, charset)
2750+
def decode(
2751+
input: Array[Byte],
2752+
charset: UTF8String,
2753+
legacyCharsets: Boolean,
2754+
legacyErrorAction: Boolean): UTF8String = {
2755+
val fromCharset = charset.toString
2756+
if (legacyCharsets || Encode.VALID_CHARSETS.contains(fromCharset.toUpperCase(Locale.ROOT))) {
2757+
val decoder = try {
2758+
val codingErrorAction = if (legacyErrorAction) {
2759+
CodingErrorAction.REPLACE
2760+
} else {
2761+
CodingErrorAction.REPORT
2762+
}
2763+
Charset.forName(fromCharset)
2764+
.newDecoder()
2765+
.onMalformedInput(codingErrorAction)
2766+
.onUnmappableCharacter(codingErrorAction)
2767+
} catch {
2768+
case _: IllegalCharsetNameException |
2769+
_: UnsupportedCharsetException |
2770+
_: IllegalArgumentException =>
2771+
throw QueryExecutionErrors.invalidCharsetError("decode", fromCharset)
2772+
}
2773+
try {
2774+
val cb = decoder.decode(ByteBuffer.wrap(input))
2775+
UTF8String.fromString(cb.toString)
2776+
} catch {
2777+
case _: CharacterCodingException =>
2778+
throw QueryExecutionErrors.malformedCharacterCoding("decode", fromCharset)
2779+
}
2780+
} else {
2781+
throw QueryExecutionErrors.invalidCharsetError("decode", fromCharset)
2782+
}
2783+
}
27752784
}
27762785

27772786
/**
@@ -2793,59 +2802,76 @@ object StringDecode {
27932802
since = "1.5.0",
27942803
group = "string_funcs")
27952804
// scalastyle:on line.size.limit
2796-
case class Encode(str: Expression, charset: Expression, legacyCharsets: Boolean)
2797-
extends BinaryExpression with ImplicitCastInputTypes with NullIntolerant {
2805+
case class Encode(
2806+
str: Expression,
2807+
charset: Expression,
2808+
legacyCharsets: Boolean,
2809+
legacyErrorAction: Boolean)
2810+
extends RuntimeReplaceable with ImplicitCastInputTypes {
27982811

27992812
def this(value: Expression, charset: Expression) =
2800-
this(value, charset, SQLConf.get.legacyJavaCharsets)
2813+
this(value, charset, SQLConf.get.legacyJavaCharsets, SQLConf.get.legacyCodingErrorAction)
28012814

2802-
override def left: Expression = str
2803-
override def right: Expression = charset
28042815
override def dataType: DataType = BinaryType
28052816
override def inputTypes: Seq[AbstractDataType] =
28062817
Seq(StringTypeAnyCollation, StringTypeAnyCollation)
28072818

2808-
private val supportedCharsets = Set(
2809-
"US-ASCII", "ISO-8859-1", "UTF-8", "UTF-16BE", "UTF-16LE", "UTF-16", "UTF-32")
2810-
2811-
protected override def nullSafeEval(input1: Any, input2: Any): Any = {
2812-
val toCharset = input2.asInstanceOf[UTF8String].toString
2813-
try {
2814-
if (legacyCharsets || supportedCharsets.contains(toCharset.toUpperCase(Locale.ROOT))) {
2815-
input1.asInstanceOf[UTF8String].toString.getBytes(toCharset)
2816-
} else throw new UnsupportedEncodingException
2817-
} catch {
2818-
case _: UnsupportedEncodingException =>
2819-
throw QueryExecutionErrors.invalidCharsetError(prettyName, toCharset)
2820-
}
2821-
}
2819+
override val replacement: Expression = StaticInvoke(
2820+
classOf[Encode],
2821+
BinaryType,
2822+
"encode",
2823+
Seq(
2824+
str, charset, Literal(legacyCharsets, BooleanType), Literal(legacyErrorAction, BooleanType)),
2825+
Seq(StringTypeAnyCollation, StringTypeAnyCollation, BooleanType, BooleanType))
28222826

2823-
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
2824-
nullSafeCodeGen(ctx, ev, (string, charset) => {
2825-
val toCharset = ctx.freshName("toCharset")
2826-
val sc = JavaCode.global(
2827-
ctx.addReferenceObj("supportedCharsets", supportedCharsets),
2828-
supportedCharsets.getClass)
2829-
s"""
2830-
String $toCharset = $charset.toString();
2831-
try {
2832-
if ($legacyCharsets || $sc.contains($toCharset.toUpperCase(java.util.Locale.ROOT))) {
2833-
${ev.value} = $string.toString().getBytes($toCharset);
2834-
} else {
2835-
throw new java.io.UnsupportedEncodingException();
2836-
}
2837-
} catch (java.io.UnsupportedEncodingException e) {
2838-
throw QueryExecutionErrors.invalidCharsetError("$prettyName", $toCharset);
2839-
}"""
2840-
})
2841-
}
2827+
override def toString: String = s"$prettyName($str, $charset)"
28422828

2843-
override protected def withNewChildrenInternal(
2844-
newLeft: Expression, newRight: Expression): Encode = copy(str = newLeft, charset = newRight)
2829+
override def children: Seq[Expression] = Seq(str, charset)
2830+
2831+
override protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression =
2832+
copy(str = newChildren.head, charset = newChildren(1))
28452833
}
28462834

28472835
object Encode {
28482836
def apply(value: Expression, charset: Expression): Encode = new Encode(value, charset)
2837+
2838+
private[expressions] final lazy val VALID_CHARSETS =
2839+
Set("US-ASCII", "ISO-8859-1", "UTF-8", "UTF-16BE", "UTF-16LE", "UTF-16", "UTF-32")
2840+
2841+
def encode(
2842+
input: UTF8String,
2843+
charset: UTF8String,
2844+
legacyCharsets: Boolean,
2845+
legacyErrorAction: Boolean): Array[Byte] = {
2846+
val toCharset = charset.toString
2847+
if (legacyCharsets || VALID_CHARSETS.contains(toCharset.toUpperCase(Locale.ROOT))) {
2848+
val encoder = try {
2849+
val codingErrorAction = if (legacyErrorAction) {
2850+
CodingErrorAction.REPLACE
2851+
} else {
2852+
CodingErrorAction.REPORT
2853+
}
2854+
Charset.forName(toCharset)
2855+
.newEncoder()
2856+
.onMalformedInput(codingErrorAction)
2857+
.onUnmappableCharacter(codingErrorAction)
2858+
} catch {
2859+
case _: IllegalCharsetNameException |
2860+
_: UnsupportedCharsetException |
2861+
_: IllegalArgumentException =>
2862+
throw QueryExecutionErrors.invalidCharsetError("encode", toCharset)
2863+
}
2864+
try {
2865+
val bb = encoder.encode(CharBuffer.wrap(input.toString))
2866+
JavaUtils.bufferToArray(bb)
2867+
} catch {
2868+
case _: CharacterCodingException =>
2869+
throw QueryExecutionErrors.malformedCharacterCoding("encode", toCharset)
2870+
}
2871+
} else {
2872+
throw QueryExecutionErrors.invalidCharsetError("encode", toCharset)
2873+
}
2874+
}
28492875
}
28502876

28512877
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2741,6 +2741,14 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
27412741
"charset" -> charset))
27422742
}
27432743

2744+
def malformedCharacterCoding(functionName: String, charset: String): RuntimeException = {
2745+
new SparkRuntimeException(
2746+
errorClass = "MALFORMED_CHARACTER_CODING",
2747+
messageParameters = Map(
2748+
"function" -> toSQLId(functionName),
2749+
"charset" -> charset))
2750+
}
2751+
27442752
def invalidWriterCommitMessageError(details: String): Throwable = {
27452753
new SparkRuntimeException(
27462754
errorClass = "INVALID_WRITER_COMMIT_MESSAGE",

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5018,6 +5018,14 @@ object SQLConf {
50185018
.booleanConf
50195019
.createWithDefault(false)
50205020

5021+
val LEGACY_CODING_ERROR_ACTION = buildConf("spark.sql.legacy.codingErrorAction")
5022+
.internal()
5023+
.doc("When set to true, encode/decode functions replace unmappable characters with mojibake " +
5024+
"instead of reporting coding errors.")
5025+
.version("4.0.0")
5026+
.booleanConf
5027+
.createWithDefault(false)
5028+
50215029
val LEGACY_EVAL_CURRENT_TIME = buildConf("spark.sql.legacy.earlyEvalCurrentTime")
50225030
.internal()
50235031
.doc("When set to true, evaluation and constant folding will happen for now() and " +
@@ -5994,6 +6002,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
59946002

59956003
def legacyJavaCharsets: Boolean = getConf(SQLConf.LEGACY_JAVA_CHARSETS)
59966004

6005+
def legacyCodingErrorAction: Boolean = getConf(SQLConf.LEGACY_CODING_ERROR_ACTION)
6006+
59976007
def legacyEvalCurrentTime: Boolean = getConf(SQLConf.LEGACY_EVAL_CURRENT_TIME)
59986008

59996009
/** ********************** SQLConf functionality methods ************ */

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
104104
test("SPARK-22543: split large if expressions into blocks due to JVM code size limit") {
105105
var strExpr: Expression = Literal("abc")
106106
for (_ <- 1 to 150) {
107-
strExpr = StringDecode(Encode(strExpr, "utf-8"), "utf-8")
107+
strExpr = StringTrimRight(StringTrimLeft(strExpr))
108108
}
109109

110110
val expressions = Seq(If(EqualTo(strExpr, strExpr), strExpr, strExpr))

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,15 @@ trait ExpressionEvalHelper extends ScalaCheckDrivenPropertyChecks with PlanTestB
7171
new ArrayBasedMapData(keyArray, valueArray)
7272
}
7373

74+
protected def replace(expr: Expression): Expression = expr match {
75+
case r: RuntimeReplaceable => replace(r.replacement)
76+
case _ => expr.mapChildren(replace)
77+
}
78+
7479
private def prepareEvaluation(expression: Expression): Expression = {
7580
val serializer = new JavaSerializer(new SparkConf()).newInstance()
7681
val resolver = ResolveTimeZone
77-
val expr = resolver.resolveTimeZones(expression)
82+
val expr = resolver.resolveTimeZones(replace(expression))
7883
assert(expr.resolved)
7984
serializer.deserialize(serializer.serialize(expr))
8085
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -505,8 +505,8 @@ class StringExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
505505
checkEvaluation(StringDecode(b, Literal.create(null, StringType)), null, create_row(null))
506506

507507
// Test escaping of charset
508-
GenerateUnsafeProjection.generate(Encode(a, Literal("\"quote")) :: Nil)
509-
GenerateUnsafeProjection.generate(StringDecode(b, Literal("\"quote")) :: Nil)
508+
GenerateUnsafeProjection.generate(Encode(a, Literal("\"quote")).replacement :: Nil)
509+
GenerateUnsafeProjection.generate(StringDecode(b, Literal("\"quote")).replacement :: Nil)
510510
}
511511

512512
test("initcap unit test") {

0 commit comments

Comments
 (0)