Skip to content

Commit b0b3cd6

Browse files
committed
Brush up UTF8StringBuilder
1 parent b5b5e35 commit b0b3cd6

File tree

2 files changed

+52
-45
lines changed

2 files changed

+52
-45
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilder.java

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,34 +19,71 @@
1919

2020
import java.nio.charset.StandardCharsets;
2121

22+
import org.apache.spark.unsafe.Platform;
23+
import org.apache.spark.unsafe.array.ByteArrayMethods;
2224
import org.apache.spark.unsafe.types.UTF8String;
2325

2426
/**
25-
* A helper class to write `UTF8String`, `String`, and `byte[]` data into an internal buffer
26-
* and get a final concatenated string.
27+
* A helper class to write `UTF8String`, `String`, and `byte[]` data into an internal byte buffer
28+
* and get written data as `UTF8String`.
2729
*/
2830
public class UTF8StringBuilder {
2931

30-
private StringBuilder buffer;
32+
private static final int ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;
33+
34+
private byte[] buffer;
35+
private int cursor = Platform.BYTE_ARRAY_OFFSET;
3136

3237
public UTF8StringBuilder() {
33-
this.buffer = new StringBuilder();
38+
// Since initial buffer size is 16 in `StringBuilder`, we set the same size here
39+
this.buffer = new byte[16];
40+
}
41+
42+
// Grows the buffer by at least `neededSize`
43+
private void grow(int neededSize) {
44+
if (neededSize > ARRAY_MAX - totalSize()) {
45+
throw new UnsupportedOperationException(
46+
"Cannot grow internal buffer by size " + neededSize + " because the size after growing " +
47+
"exceeds size limitation " + ARRAY_MAX);
48+
}
49+
final int length = totalSize() + neededSize;
50+
if (buffer.length < length) {
51+
int newLength = length < ARRAY_MAX / 2 ? length * 2 : ARRAY_MAX;
52+
final byte[] tmp = new byte[newLength];
53+
Platform.copyMemory(
54+
buffer,
55+
Platform.BYTE_ARRAY_OFFSET,
56+
tmp,
57+
Platform.BYTE_ARRAY_OFFSET,
58+
totalSize());
59+
buffer = tmp;
60+
}
3461
}
3562

3663
public void append(UTF8String value) {
37-
buffer.append(value);
64+
grow(value.numBytes());
65+
value.writeToMemory(buffer, cursor);
66+
cursor += value.numBytes();
3867
}
3968

4069
public void append(String value) {
41-
buffer.append(value);
70+
append(value.getBytes(StandardCharsets.UTF_8));
4271
}
4372

4473
public void append(byte[] value) {
45-
buffer.append(new String(value, StandardCharsets.UTF_8));
74+
grow(value.length);
75+
Platform.copyMemory(value, Platform.BYTE_ARRAY_OFFSET, buffer, cursor, value.length);
76+
cursor += value.length;
77+
}
78+
79+
public UTF8String toUTF8String() {
80+
final int len = totalSize();
81+
final byte[] bytes = new byte[len];
82+
Platform.copyMemory(buffer, Platform.BYTE_ARRAY_OFFSET, bytes, Platform.BYTE_ARRAY_OFFSET, len);
83+
return UTF8String.fromBytes(bytes);
4684
}
4785

48-
@Override
49-
public String toString() {
50-
return buffer.toString();
86+
public int totalSize() {
87+
return cursor - Platform.BYTE_ARRAY_OFFSET;
5188
}
5289
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala

Lines changed: 5 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -199,36 +199,6 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String
199199

200200
// [[func]] assumes the input is no longer null because eval already does the null check.
201201
@inline private[this] def buildCast[T](a: Any, func: T => Any): Any = func(a.asInstanceOf[T])
202-
@inline private[this] def buildWriter[T](
203-
a: Any, buffer: UTF8StringBuilder, writer: (T, UTF8StringBuilder) => Unit): Unit = {
204-
writer(a.asInstanceOf[T], buffer)
205-
}
206-
207-
private[this] def buildElemWriter(
208-
from: DataType): (Any, UTF8StringBuilder) => Unit = from match {
209-
case BinaryType => buildWriter[Array[Byte]](_, _, (b, buf) => buf.append(b))
210-
case StringType => buildWriter[UTF8String](_, _, (b, buf) => buf.append(b))
211-
case DateType => buildWriter[Int](_, _,
212-
(d, buf) => buf.append(DateTimeUtils.dateToString(d)))
213-
case TimestampType => buildWriter[Long](_, _,
214-
(t, buf) => buf.append(DateTimeUtils.timestampToString(t)))
215-
case ar: ArrayType =>
216-
buildWriter[ArrayData](_, _, (array, buf) => {
217-
buf.append("[")
218-
if (array.numElements > 0) {
219-
val writeElemToBuffer = buildElemWriter(ar.elementType)
220-
writeElemToBuffer(array.get(0, ar.elementType), buf)
221-
var i = 1
222-
while (i < array.numElements) {
223-
buf.append(", ")
224-
writeElemToBuffer(array.get(i, ar.elementType), buf)
225-
i += 1
226-
}
227-
}
228-
buf.append("]")
229-
})
230-
case _ => buildWriter[Any](_, _, (o, buf) => buf.append(String.valueOf(o)))
231-
}
232202

233203
// UDFToString
234204
private[this] def castToString(from: DataType): Any => Any = from match {
@@ -241,17 +211,17 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String
241211
val res = new UTF8StringBuilder
242212
res.append("[")
243213
if (array.numElements > 0) {
244-
val writeElemToBuffer = buildElemWriter(ar.elementType)
245-
writeElemToBuffer(array.get(0, ar.elementType), res)
214+
val toUTF8String = castToString(ar.elementType)
215+
res.append(toUTF8String(array.get(0, ar.elementType)).asInstanceOf[UTF8String])
246216
var i = 1
247217
while (i < array.numElements) {
248218
res.append(", ")
249-
writeElemToBuffer(array.get(i, ar.elementType), res)
219+
res.append(toUTF8String(array.get(i, ar.elementType)).asInstanceOf[UTF8String])
250220
i += 1
251221
}
252222
}
253223
res.append("]")
254-
UTF8String.fromString(res.toString)
224+
res.toUTF8String
255225
})
256226
case _ => buildCast[Any](_, o => UTF8String.fromString(o.toString))
257227
}
@@ -709,7 +679,7 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String
709679
s"""
710680
|$bufferClass $bufferTerm = new $bufferClass();
711681
|$writeArrayToBuffer($c, $bufferTerm);
712-
|$evPrim = UTF8String.fromString($bufferTerm.toString());
682+
|$evPrim = $bufferTerm.toUTF8String();
713683
""".stripMargin
714684
}
715685
case _ =>

0 commit comments

Comments
 (0)