Skip to content

Commit 7bfbeb6

Browse files
amaliujiahvanhovell
authored andcommitted
[SPARK-44326][SQL][CONNECT] Move utils that are used from Scala client to the common modules
### What changes were proposed in this pull request? There are some utils are used in the scala client including ser/derse, datetime and interval utils. These can be moved to the common modules. ### Why are the changes needed? To make sure Scala client does not depend on the Catalyst in the future. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing test Closes #41885 from amaliujia/SPARK-44326. Authored-by: Rui Wang <rui.wang@databricks.com> Signed-off-by: Herman van Hovell <herman@databricks.com>
1 parent f7d47e8 commit 7bfbeb6

File tree

9 files changed

+345
-235
lines changed

9 files changed

+345
-235
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.util
18+
19+
import java.io.{ByteArrayOutputStream, ObjectOutputStream}
20+
21+
object SparkSerDerseUtils {
22+
/** Serialize an object using Java serialization */
23+
def serialize[T](o: T): Array[Byte] = {
24+
val bos = new ByteArrayOutputStream()
25+
val oos = new ObjectOutputStream(bos)
26+
oos.writeObject(o)
27+
oos.close()
28+
bos.toByteArray
29+
}
30+
}

connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ import java.util.concurrent.TimeUnit
2222
import scala.concurrent.duration.Duration
2323

2424
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_DAY
25-
import org.apache.spark.sql.catalyst.util.DateTimeUtils.microsToMillis
26-
import org.apache.spark.sql.catalyst.util.IntervalUtils
25+
import org.apache.spark.sql.catalyst.util.SparkDateTimeUtils.microsToMillis
26+
import org.apache.spark.sql.catalyst.util.SparkIntervalUtils
2727
import org.apache.spark.sql.streaming.Trigger
2828
import org.apache.spark.unsafe.types.UTF8String
2929

@@ -35,7 +35,7 @@ private object Triggers {
3535
}
3636

3737
def convert(interval: String): Long = {
38-
val cal = IntervalUtils.stringToInterval(UTF8String.fromString(interval))
38+
val cal = SparkIntervalUtils.stringToInterval(UTF8String.fromString(interval))
3939
if (cal.months != 0) {
4040
throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval")
4141
}

connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.sql.Column
2626
import org.apache.spark.sql.catalyst.ScalaReflection
2727
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
2828
import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, UdfPacket}
29-
import org.apache.spark.util.Utils
29+
import org.apache.spark.util.SparkSerDerseUtils
3030

3131
/**
3232
* A user-defined function. To create one, use the `udf` functions in `functions`.
@@ -103,7 +103,8 @@ case class ScalarUserDefinedFunction(
103103

104104
// SPARK-43198: Eagerly serialize to prevent the UDF from containing a reference to this class.
105105
private[this] val udf = {
106-
val udfPacketBytes = Utils.serialize(UdfPacket(function, inputEncoders, outputEncoder))
106+
val udfPacketBytes =
107+
SparkSerDerseUtils.serialize(UdfPacket(function, inputEncoders, outputEncoder))
107108
val scalaUdfBuilder = proto.ScalarScalaUDF
108109
.newBuilder()
109110
.setPayload(ByteString.copyFrom(udfPacketBytes))

connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.streaming.AvailableNowTrigger
3535
import org.apache.spark.sql.execution.streaming.ContinuousTrigger
3636
import org.apache.spark.sql.execution.streaming.OneTimeTrigger
3737
import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger
38-
import org.apache.spark.util.Utils
38+
import org.apache.spark.util.SparkSerDerseUtils
3939

4040
/**
4141
* Interface used to write a streaming `Dataset` to external storage systems (e.g. file systems,
@@ -214,7 +214,7 @@ final class DataStreamWriter[T] private[sql] (ds: Dataset[T]) extends Logging {
214214
* @since 3.5.0
215215
*/
216216
def foreach(writer: ForeachWriter[T]): DataStreamWriter[T] = {
217-
val serialized = Utils.serialize(ForeachWriterPacket(writer, ds.encoder))
217+
val serialized = SparkSerDerseUtils.serialize(ForeachWriterPacket(writer, ds.encoder))
218218
val scalaWriterBuilder = proto.ScalarScalaUDF
219219
.newBuilder()
220220
.setPayload(ByteString.copyFrom(serialized))

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -121,11 +121,7 @@ private[spark] object Utils extends Logging with SparkClassUtils {
121121

122122
/** Serialize an object using Java serialization */
123123
def serialize[T](o: T): Array[Byte] = {
124-
val bos = new ByteArrayOutputStream()
125-
val oos = new ObjectOutputStream(bos)
126-
oos.writeObject(o)
127-
oos.close()
128-
bos.toByteArray
124+
SparkSerDerseUtils.serialize(o)
129125
}
130126

131127
/** Deserialize an object using Java serialization */
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.catalyst.util
18+
19+
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_MILLIS
20+
21+
object SparkDateTimeUtils {
22+
/**
23+
* Converts the timestamp to milliseconds since epoch. In Spark timestamp values have microseconds
24+
* precision, so this conversion is lossy.
25+
*/
26+
def microsToMillis(micros: Long): Long = {
27+
// When the timestamp is negative i.e before 1970, we need to adjust the milliseconds portion.
28+
// Example - 1965-01-01 10:11:12.123456 is represented as (-157700927876544) in micro precision.
29+
// In millis precision the above needs to be represented as (-157700927877).
30+
Math.floorDiv(micros, MICROS_PER_MILLIS)
31+
}
32+
33+
/**
34+
* Converts milliseconds since the epoch to microseconds.
35+
*/
36+
def millisToMicros(millis: Long): Long = {
37+
Math.multiplyExact(millis, MICROS_PER_MILLIS)
38+
}
39+
}

0 commit comments

Comments
 (0)