Skip to content

Commit f1f5bb6

Browse files
wForgetTed Jenks
authored andcommitted
[SPARK-47307][SQL][3.5] Add a config to optionally chunk base64 strings
Backports #47303 to 3.5 ### What changes were proposed in this pull request? [[SPARK-47307](https://issues.apache.org/jira/browse/SPARK-47307)] Add a config to optionally chunk base64 strings ### Why are the changes needed? In #35110, it was incorrectly asserted that: > ApacheCommonBase64 obeys http://www.ietf.org/rfc/rfc2045.txt This is not true as the previous code called: ```java public static byte[] encodeBase64(byte[] binaryData) ``` Which states: > Encodes binary data using the base64 algorithm but does not chunk the output. However, the RFC 2045 (MIME) base64 encoder does chunk by default. This now means that any Spark encoded base64 strings cannot be decoded by encoders that do not implement RFC 2045. The docs state RFC 4648. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing test suite. ### Was this patch authored or co-authored using generative AI tooling? No Closes #47325 from wForget/SPARK-47307_3.5. Lead-authored-by: wforget <643348094@qq.com> Co-authored-by: Ted Jenks <tedcj@palantir.com> Signed-off-by: Kent Yao <yao@apache.org>
1 parent 4d1bbfd commit f1f5bb6

File tree

5 files changed

+59
-14
lines changed

5 files changed

+59
-14
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
Project [base64(cast(g#0 as binary)) AS base64(g)#0]
1+
Project [staticinvoke(class org.apache.spark.sql.catalyst.expressions.Base64, StringType, encode, cast(g#0 as binary), false, BinaryType, BooleanType, true, true, true) AS base64(g)#0]
22
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2419,24 +2419,40 @@ case class Chr(child: Expression)
24192419
""",
24202420
since = "1.5.0",
24212421
group = "string_funcs")
2422-
case class Base64(child: Expression)
2423-
extends UnaryExpression with ImplicitCastInputTypes with NullIntolerant {
2422+
case class Base64(child: Expression, chunkBase64: Boolean)
2423+
extends UnaryExpression with RuntimeReplaceable with ImplicitCastInputTypes {
2424+
2425+
def this(expr: Expression) = this(expr, SQLConf.get.chunkBase64StringEnabled)
24242426

24252427
override def dataType: DataType = StringType
24262428
override def inputTypes: Seq[DataType] = Seq(BinaryType)
24272429

2428-
protected override def nullSafeEval(bytes: Any): Any = {
2429-
UTF8String.fromBytes(JBase64.getMimeEncoder.encode(bytes.asInstanceOf[Array[Byte]]))
2430-
}
2430+
override def replacement: Expression = StaticInvoke(
2431+
classOf[Base64],
2432+
dataType,
2433+
"encode",
2434+
Seq(child, Literal(chunkBase64, BooleanType)),
2435+
Seq(BinaryType, BooleanType))
24312436

2432-
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
2433-
nullSafeCodeGen(ctx, ev, (child) => {
2434-
s"""${ev.value} = UTF8String.fromBytes(
2435-
${classOf[JBase64].getName}.getMimeEncoder().encode($child));
2436-
"""})
2437-
}
2437+
override def toString: String = s"$prettyName($child)"
24382438

2439-
override protected def withNewChildInternal(newChild: Expression): Base64 = copy(child = newChild)
2439+
override protected def withNewChildInternal(newChild: Expression): Expression =
2440+
copy(child = newChild)
2441+
}
2442+
2443+
object Base64 {
2444+
def apply(expr: Expression): Base64 = new Base64(expr)
2445+
2446+
private lazy val nonChunkEncoder = JBase64.getMimeEncoder(-1, Array())
2447+
2448+
def encode(input: Array[Byte], chunkBase64: Boolean): UTF8String = {
2449+
val encoder = if (chunkBase64) {
2450+
JBase64.getMimeEncoder
2451+
} else {
2452+
nonChunkEncoder
2453+
}
2454+
UTF8String.fromBytes(encoder.encode(input))
2455+
}
24402456
}
24412457

24422458
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3229,6 +3229,15 @@ object SQLConf {
32293229
.booleanConf
32303230
.createWithDefault(false)
32313231

3232+
val CHUNK_BASE64_STRING_ENABLED = buildConf("spark.sql.legacy.chunkBase64String.enabled")
3233+
.internal()
3234+
.doc("Whether to truncate string generated by the `Base64` function. When true, base64" +
3235+
" strings generated by the base64 function are chunked into lines of at most 76" +
3236+
" characters. When false, the base64 strings are not chunked.")
3237+
.version("3.5.2")
3238+
.booleanConf
3239+
.createWithDefault(false)
3240+
32323241
val ENABLE_DEFAULT_COLUMNS =
32333242
buildConf("spark.sql.defaultColumn.enabled")
32343243
.internal()
@@ -5111,6 +5120,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
51115120

51125121
def ansiRelationPrecedence: Boolean = ansiEnabled && getConf(ANSI_RELATION_PRECEDENCE)
51135122

5123+
def chunkBase64StringEnabled: Boolean = getConf(CHUNK_BASE64_STRING_ENABLED)
5124+
51145125
def timestampType: AtomicType = getConf(TIMESTAMP_TYPE) match {
51155126
case "TIMESTAMP_LTZ" =>
51165127
// For historical reason, the TimestampType maps to TIMESTAMP WITH LOCAL TIME ZONE

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,15 @@ trait ExpressionEvalHelper extends ScalaCheckDrivenPropertyChecks with PlanTestB
7171
new ArrayBasedMapData(keyArray, valueArray)
7272
}
7373

74+
protected def replace(expr: Expression): Expression = expr match {
75+
case r: RuntimeReplaceable => replace(r.replacement)
76+
case _ => expr.mapChildren(replace)
77+
}
78+
7479
private def prepareEvaluation(expression: Expression): Expression = {
7580
val serializer = new JavaSerializer(new SparkConf()).newInstance
7681
val resolver = ResolveTimeZone
77-
val expr = resolver.resolveTimeZones(expression)
82+
val expr = resolver.resolveTimeZones(replace(expression))
7883
assert(expr.resolved)
7984
serializer.deserialize(serializer.serialize(expr))
8085
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,19 @@ class StringExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
506506
GenerateUnsafeProjection.generate(StringDecode(b, Literal("\"quote")) :: Nil)
507507
}
508508

509+
test("SPARK-47307: base64 encoding without chunking") {
510+
val longString = "a" * 58
511+
val encoded = "YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYQ=="
512+
withSQLConf(SQLConf.CHUNK_BASE64_STRING_ENABLED.key -> "false") {
513+
checkEvaluation(Base64(Literal(longString.getBytes)), encoded)
514+
}
515+
val chunkEncoded =
516+
s"YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFh\r\nYQ=="
517+
withSQLConf(SQLConf.CHUNK_BASE64_STRING_ENABLED.key -> "true") {
518+
checkEvaluation(Base64(Literal(longString.getBytes)), chunkEncoded)
519+
}
520+
}
521+
509522
test("initcap unit test") {
510523
checkEvaluation(InitCap(Literal.create(null, StringType)), null)
511524
checkEvaluation(InitCap(Literal("a b")), "A B")

0 commit comments

Comments
 (0)