Skip to content

[SPARK-48498][SQL] Always do char padding in predicates #46832

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4603,6 +4603,14 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val LEGACY_NO_CHAR_PADDING_IN_PREDICATE = buildConf("spark.sql.legacy.noCharPaddingInPredicate")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the legacy behavior is always padding, how about LEGACY_CHAR_PADDING_IN_PREDICATE and change the default value to true ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

legacy behavior is no padding.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, there is a bug in Spark that we always pad the string literal when comparing CHAR type and STRING literals, which assumes the CHAR type columns are always padded, either on the write side or read side.
I'm a bit confused.

Copy link
Contributor Author

@cloud-fan cloud-fan Jun 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me make it clear. It was talking about when people disabling spark.sql.readSideCharPadding

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thank you.

.internal()
.doc("When true, Spark will not apply char type padding for CHAR type columns in string " +
s"comparison predicates, when '${READ_SIDE_CHAR_PADDING.key}' is false.")
.version("4.0.0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ: Is it needed by 3.5 or earlier?

Copy link
Contributor Author

@cloud-fan cloud-fan Jun 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally yes, but it's only a problem if people turn off READ_SIDE_CHAR_PADDING, so not a bug by default, and I don't have a strong preference on backporting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We met SPARK-48562 in Spark 3.5, which is a bug for ApplyCharTypePadding while writing JDBC temporary views, it lead us turn off READ_SIDE_CHAR_PADDING while creating JDBC temp views. Thus, we need backport this pr or filx the SPARK-48562 in 3.5.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Feel free to create a backport PR and I can help merge it.

.booleanConf
.createWithDefault(false)

val CLI_PRINT_HEADER =
buildConf("spark.sql.cli.print.header")
.doc("When set to true, spark-sql CLI prints the names of the columns in query output.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.{BINARY_COMPARISON, IN}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{CharType, Metadata, StringType}
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -66,9 +67,10 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] {
r.copy(dataCols = cleanedDataCols, partitionCols = cleanedPartCols)
})
}
paddingForStringComparison(newPlan)
paddingForStringComparison(newPlan, padCharCol = false)
} else {
paddingForStringComparison(plan)
paddingForStringComparison(
plan, padCharCol = !conf.getConf(SQLConf.LEGACY_NO_CHAR_PADDING_IN_PREDICATE))
}
}

Expand All @@ -90,7 +92,7 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] {
}
}

private def paddingForStringComparison(plan: LogicalPlan): LogicalPlan = {
private def paddingForStringComparison(plan: LogicalPlan, padCharCol: Boolean): LogicalPlan = {
plan.resolveOperatorsUpWithPruning(_.containsAnyPattern(BINARY_COMPARISON, IN)) {
case operator => operator.transformExpressionsUpWithPruning(
_.containsAnyPattern(BINARY_COMPARISON, IN)) {
Expand All @@ -99,12 +101,12 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] {
// String literal is treated as char type when it's compared to a char type column.
// We should pad the shorter one to the longer length.
case b @ BinaryComparison(e @ AttrOrOuterRef(attr), lit) if lit.foldable =>
padAttrLitCmp(e, attr.metadata, lit).map { newChildren =>
padAttrLitCmp(e, attr.metadata, padCharCol, lit).map { newChildren =>
b.withNewChildren(newChildren)
}.getOrElse(b)

case b @ BinaryComparison(lit, e @ AttrOrOuterRef(attr)) if lit.foldable =>
padAttrLitCmp(e, attr.metadata, lit).map { newChildren =>
padAttrLitCmp(e, attr.metadata, padCharCol, lit).map { newChildren =>
b.withNewChildren(newChildren.reverse)
}.getOrElse(b)

Expand All @@ -117,9 +119,10 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] {
val literalCharLengths = literalChars.map(_.numChars())
val targetLen = (length +: literalCharLengths).max
Some(i.copy(
value = addPadding(e, length, targetLen),
value = addPadding(e, length, targetLen, alwaysPad = padCharCol),
list = list.zip(literalCharLengths).map {
case (lit, charLength) => addPadding(lit, charLength, targetLen)
case (lit, charLength) =>
addPadding(lit, charLength, targetLen, alwaysPad = false)
} ++ nulls.map(Literal.create(_, StringType))))
case _ => None
}.getOrElse(i)
Expand Down Expand Up @@ -162,6 +165,7 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] {
private def padAttrLitCmp(
expr: Expression,
metadata: Metadata,
padCharCol: Boolean,
lit: Expression): Option[Seq[Expression]] = {
if (expr.dataType == StringType) {
CharVarcharUtils.getRawType(metadata).flatMap {
Expand All @@ -174,7 +178,14 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] {
if (length < stringLitLen) {
Some(Seq(StringRPad(expr, Literal(stringLitLen)), lit))
} else if (length > stringLitLen) {
Some(Seq(expr, StringRPad(lit, Literal(length))))
val paddedExpr = if (padCharCol) {
StringRPad(expr, Literal(length))
} else {
expr
}
Some(Seq(paddedExpr, StringRPad(lit, Literal(length))))
} else if (padCharCol) {
Some(Seq(StringRPad(expr, Literal(length)), lit))
} else {
None
}
Expand All @@ -186,7 +197,15 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] {
}
}

private def addPadding(expr: Expression, charLength: Int, targetLength: Int): Expression = {
if (targetLength > charLength) StringRPad(expr, Literal(targetLength)) else expr
private def addPadding(
expr: Expression,
charLength: Int,
targetLength: Int,
alwaysPad: Boolean): Expression = {
if (targetLength > charLength) {
StringRPad(expr, Literal(targetLength))
} else if (alwaysPad) {
StringRPad(expr, Literal(charLength))
} else expr
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,34 @@ class FileSourceCharVarcharTestSuite extends CharVarcharTestSuite with SharedSpa
}
}
}

test("SPARK-48498: always do char padding in predicates") {
import testImplicits._
withSQLConf(SQLConf.READ_SIDE_CHAR_PADDING.key -> "false") {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If set key LEGACY_NO_CHAR_PADDING_IN_PREDICATE = true, this case will fail.

if not set this key to true, maybe can not get data when char column compare to liter, because liter would be padded but char column not, any idea?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LEGACY_NO_CHAR_PADDING_IN_PREDICATE means no padding, and this test will fail if this config is true as this test wants padding.

withTempPath { dir =>
withTable("t") {
Seq(
"12" -> "12",
"12" -> "12 ",
"12 " -> "12",
"12 " -> "12 "
).toDF("c1", "c2").write.format(format).save(dir.toString)
sql(s"CREATE TABLE t (c1 CHAR(3), c2 STRING) USING $format LOCATION '$dir'")
// Comparing CHAR column with STRING column directly compares the stored value.
checkAnswer(
sql("SELECT c1 = c2 FROM t"),
Seq(Row(true), Row(false), Row(false), Row(true))
)
// No matter the CHAR type value is padded or not in the storage, we should always pad it
// before comparison with STRING literals.
checkAnswer(
sql("SELECT c1 = '12', c1 = '12 ', c1 = '12 ' FROM t WHERE c2 = '12'"),
Seq(Row(true, true, true), Row(true, true, true))
)
}
}
}
}
}

class DSV2CharVarcharTestSuite extends CharVarcharTestSuite
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,11 @@ trait PlanStabilitySuite extends DisableAdaptiveExecutionSuite {
protected def testQuery(tpcdsGroup: String, query: String, suffix: String = ""): Unit = {
val queryString = resourceToString(s"$tpcdsGroup/$query.sql",
classLoader = Thread.currentThread().getContextClassLoader)
// Disable char/varchar read-side handling for better performance.
withSQLConf(SQLConf.READ_SIDE_CHAR_PADDING.key -> "false",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10MB") {
withSQLConf(
// Disable char/varchar read-side handling for better performance.
SQLConf.READ_SIDE_CHAR_PADDING.key -> "false",
SQLConf.LEGACY_NO_CHAR_PADDING_IN_PREDICATE.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10MB") {
val qe = sql(queryString).queryExecution
val plan = qe.executedPlan
val explain = normalizeLocation(normalizeIds(qe.explainString(FormattedMode)))
Expand Down