Skip to content

Commit ad61b0c

Browse files
committed
Formatting
1 parent 8053e58 commit ad61b0c

File tree

5 files changed

+24
-23
lines changed

5 files changed

+24
-23
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ReducibleFunction.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,12 @@
6161
public interface ReducibleFunction<I, O> {
6262

6363
/**
64-
* This method is for parameterized functions.
64+
* This method is for the bucket function.
6565
*
66-
* If this parameterized function is 'reducible' on another bucket function,
66+
* If this bucket function is 'reducible' on another bucket function,
6767
* return the {@link Reducer} function.
6868
* <p>
69-
* Example to return reducer for reducing f_source = bucket(4, x) on f_target = bucket(2, x)
69+
* For example, to return reducer for reducing f_source = bucket(4, x) on f_target = bucket(2, x)
7070
* <ul>
7171
* <li>thisBucketFunction = bucket</li>
7272
* <li>thisNumBuckets = 4</li>
@@ -79,10 +79,10 @@ public interface ReducibleFunction<I, O> {
7979
* @param otherNumBuckets parameter for the other function
8080
* @return a reduction function if it is reducible, null if not
8181
*/
82-
default Reducer<I, O> bucketReducer(
83-
int thisNumBuckets,
84-
ReducibleFunction<?, ?> otherBucketFunction,
85-
int otherNumBuckets) {
82+
default Reducer<I, O> reducer(
83+
int thisNumBuckets,
84+
ReducibleFunction<?, ?> otherBucketFunction,
85+
int otherNumBuckets) {
8686
throw new UnsupportedOperationException();
8787
}
8888

@@ -100,7 +100,7 @@ default Reducer<I, O> bucketReducer(
100100
* @param otherFunction the other function
101101
* @return a reduction function if it is reducible, null if not.
102102
*/
103-
default Reducer<I, O> bucketReducer(ReducibleFunction<?, ?> otherFunction) {
103+
default Reducer<I, O> reducer(ReducibleFunction<?, ?> otherFunction) {
104104
throw new UnsupportedOperationException();
105105
}
106106
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TransformExpression.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -96,14 +96,15 @@ case class TransformExpression(
9696
}
9797

9898
// Return a Reducer for a reducible function on another reducible function
99-
private def reducer(thisFunction: ReducibleFunction[_, _],
100-
thisNumBucketsOpt: Option[Int],
101-
otherFunction: ReducibleFunction[_, _],
102-
otherNumBucketsOpt: Option[Int]): Option[Reducer[_, _]] = {
99+
private def reducer(
100+
thisFunction: ReducibleFunction[_, _],
101+
thisNumBucketsOpt: Option[Int],
102+
otherFunction: ReducibleFunction[_, _],
103+
otherNumBucketsOpt: Option[Int]): Option[Reducer[_, _]] = {
103104
val res = (thisNumBucketsOpt, otherNumBucketsOpt) match {
104105
case (Some(numBuckets), Some(otherNumBuckets)) =>
105-
thisFunction.bucketReducer(numBuckets, otherFunction, otherNumBuckets)
106-
case _ => thisFunction.bucketReducer(otherFunction)
106+
thisFunction.reducer(numBuckets, otherFunction, otherNumBuckets)
107+
case _ => thisFunction.reducer(otherFunction)
107108
}
108109
Option(res)
109110
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -881,9 +881,9 @@ case class KeyGroupedShuffleSpec(
881881

882882
object KeyGroupedShuffleSpec {
883883
def reducePartitionValue(
884-
row: InternalRow,
885-
expressions: Seq[Expression],
886-
reducers: Seq[Option[Reducer[_, _]]]):
884+
row: InternalRow,
885+
expressions: Seq[Expression],
886+
reducers: Seq[Option[Reducer[_, _]]]):
887887
InternalRowComparableWrapper = {
888888
val partitionVals = row.toSeq(expressions.map(_.dataType))
889889
val reducedRow = partitionVals.zip(reducers).map{

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -569,8 +569,8 @@ case class EnsureRequirements(
569569
}
570570

571571
private def reduceCommonPartValues(commonPartValues: Seq[(InternalRow, Int)],
572-
expressions: Seq[Expression],
573-
reducers: Option[Seq[Option[Reducer[_, _]]]]) = {
572+
expressions: Seq[Expression],
573+
reducers: Option[Seq[Option[Reducer[_, _]]]]) = {
574574
reducers match {
575575
case Some(reducers) => commonPartValues.groupBy { case (row, _) =>
576576
KeyGroupedShuffleSpec.reducePartitionValue(row, expressions, reducers)

sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,10 @@ object BucketFunction extends ScalarFunction[Int] with ReducibleFunction[Int, In
8686
(input.getLong(1) % input.getInt(0)).toInt
8787
}
8888

89-
override def bucketReducer(
90-
thisNumBuckets: Int,
91-
otherFunc: ReducibleFunction[_, _],
92-
otherNumBuckets: Int): Reducer[Int, Int] = {
89+
override def reducer(
90+
thisNumBuckets: Int,
91+
otherFunc: ReducibleFunction[_, _],
92+
otherNumBuckets: Int): Reducer[Int, Int] = {
9393

9494
if (otherFunc == BucketFunction) {
9595
if ((thisNumBuckets > otherNumBuckets)

0 commit comments

Comments
 (0)