-
Notifications
You must be signed in to change notification settings - Fork 28.6k
Commit 771c538
[SPARK-33084][SQL][TESTS][FOLLOW-UP] Fix Scala 2.13 UT failure
### What changes were proposed in this pull request?
Fix UT according to #29966 (comment)
Change StructType construct from
```
def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
```
to
```
def inputSchema: StructType = new StructType().add("inputColumn", LongType)
```
The whole udf class is :
```
package org.apache.spark.examples.sql
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
class Spark33084 extends UserDefinedAggregateFunction {
// Data types of input arguments of this aggregate function
def inputSchema: StructType = new StructType().add("inputColumn", LongType)
// Data types of values in the aggregation buffer
def bufferSchema: StructType =
new StructType().add("sum", LongType).add("count", LongType)
// The data type of the returned value
def dataType: DataType = DoubleType
// Whether this function always returns the same output on the identical input
def deterministic: Boolean = true
// Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
// standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
// the opportunity to update its values. Note that arrays and maps inside the buffer are still
// immutable.
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0L
buffer(1) = 0L
}
// Updates the given aggregation buffer `buffer` with new input data from `input`
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (!input.isNullAt(0)) {
buffer(0) = buffer.getLong(0) + input.getLong(0)
buffer(1) = buffer.getLong(1) + 1
}
}
// Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
// Calculates the final result
def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}
```
### Why are the changes needed?
Fix UT for scala 2.13
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existed UT
Closes #30980 from AngersZhuuuu/spark-33084-followup.
Authored-by: angerszhu <angers.zhu@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>1 parent 3fe5614 commit 771c538Copy full SHA for 771c538
File tree
Expand file treeCollapse file tree
1 file changed
+0
-0
lines changedFilter options
- sql/core/src/test/resources
Expand file treeCollapse file tree
1 file changed
+0
-0
lines changedsql/core/src/test/resources/SPARK-33084.jar
Copy file name to clipboard-203 Bytes
Binary file not shown.
0 commit comments