Skip to content

Commit 791b96a

Browse files
committed
Use UTF8String.
1 parent 60a1487 commit 791b96a

File tree

2 files changed

+12
-8
lines changed

2 files changed

+12
-8
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
1919

2020
import org.apache.spark.annotation.DeveloperApi
2121
import org.apache.spark.shuffle.sort.SortShuffleManager
22-
import org.apache.spark.{SparkEnv, HashPartitioner, RangePartitioner, SparkConf}
22+
import org.apache.spark.{SparkEnv, HashPartitioner, RangePartitioner}
2323
import org.apache.spark.rdd.{RDD, ShuffledRDD}
2424
import org.apache.spark.serializer.Serializer
2525
import org.apache.spark.sql.{SQLContext, Row}
@@ -79,7 +79,7 @@ case class Exchange(
7979
}
8080
}
8181

82-
private lazy val sparkConf = child.sqlContext.sparkContext.getConf
82+
@transient private lazy val sparkConf = child.sqlContext.sparkContext.getConf
8383

8484
def serializer(
8585
keySchema: Array[DataType],
@@ -92,10 +92,10 @@ case class Exchange(
9292
SparkSqlSerializer2.support(valueSchema)
9393

9494
val serializer = if (useSqlSerializer2) {
95-
logInfo("Use ShuffleSerializer")
95+
logInfo("Use SparkSqlSerializer2.")
9696
new SparkSqlSerializer2(keySchema, valueSchema)
9797
} else {
98-
logInfo("Use SparkSqlSerializer")
98+
logInfo("Use SparkSqlSerializer.")
9999
new SparkSqlSerializer(sparkConf)
100100
}
101101

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -263,8 +263,9 @@ private[sql] object SparkSqlSerializer2 {
263263
out.writeByte(NULL)
264264
} else {
265265
out.writeByte(NOT_NULL)
266-
// TODO: Update it once the string improvement is in.
267-
out.writeUTF(row.getString(i))
266+
val bytes = row.getAs[UTF8String](i).getBytes
267+
out.writeInt(bytes.length)
268+
out.write(bytes)
268269
}
269270

270271
case BinaryType =>
@@ -386,8 +387,11 @@ private[sql] object SparkSqlSerializer2 {
386387
if (in.readByte() == NULL) {
387388
mutableRow.setNullAt(i)
388389
} else {
389-
// TODO: Update it once the string improvement is in.
390-
mutableRow.setString(i, in.readUTF())
390+
// TODO: reuse the byte array in the UTF8String.
391+
val length = in.readInt()
392+
val bytes = new Array[Byte](length)
393+
in.readFully(bytes)
394+
mutableRow.update(i, UTF8String(bytes))
391395
}
392396

393397
case BinaryType =>

0 commit comments

Comments
 (0)