Skip to content

Commit

Permalink
Address comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Jun 23, 2020
1 parent 0a9223f commit 43c4726
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 7 deletions.
12 changes: 6 additions & 6 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2993,7 +2993,7 @@ class Dataset[T] private[sql](

private def repartitionByExpression(
numPartitions: Option[Int],
partitionExprs: Column*): Dataset[T] = {
partitionExprs: Seq[Column]): Dataset[T] = {
// The underlying `LogicalPlan` operator special-cases all-`SortOrder` arguments.
// However, we don't want to complicate the semantics of this API method.
// Instead, let's give users a friendly error message, pointing them to the new method.
Expand All @@ -3018,7 +3018,7 @@ class Dataset[T] private[sql](
*/
@scala.annotation.varargs
def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = {
repartitionByExpression(Some(numPartitions), partitionExprs: _*)
repartitionByExpression(Some(numPartitions), partitionExprs)
}

/**
Expand All @@ -3033,12 +3033,12 @@ class Dataset[T] private[sql](
*/
@scala.annotation.varargs
def repartition(partitionExprs: Column*): Dataset[T] = {
repartitionByExpression(None, partitionExprs: _*)
repartitionByExpression(None, partitionExprs)
}

private def repartitionByRange(
numPartitions: Option[Int],
partitionExprs: Column*): Dataset[T] = {
partitionExprs: Seq[Column]): Dataset[T] = {
require(partitionExprs.nonEmpty, "At least one partition-by expression must be specified.")
val sortOrder: Seq[SortOrder] = partitionExprs.map(_.expr match {
case expr: SortOrder => expr
Expand Down Expand Up @@ -3068,7 +3068,7 @@ class Dataset[T] private[sql](
*/
@scala.annotation.varargs
def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T] = {
repartitionByRange(Some(numPartitions), partitionExprs: _*)
repartitionByRange(Some(numPartitions), partitionExprs)
}

/**
Expand All @@ -3090,7 +3090,7 @@ class Dataset[T] private[sql](
*/
@scala.annotation.varargs
def repartitionByRange(partitionExprs: Column*): Dataset[T] = {
repartitionByRange(None, partitionExprs: _*)
repartitionByRange(None, partitionExprs)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case r: logical.RepartitionByExpression =>
val canChangeNumParts = r.optNumPartitions.isEmpty
exchange.ShuffleExchangeExec(
r.partitioning, planLater(r.child), canChangeNumPartitions = canChangeNumParts) :: Nil
r.partitioning, planLater(r.child), canChangeNumParts) :: Nil
case ExternalRDD(outputObjAttr, rdd) => ExternalRDDScanExec(outputObjAttr, rdd) :: Nil
case r: LogicalRDD =>
RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning, r.outputOrdering) :: Nil
Expand Down

0 comments on commit 43c4726

Please sign in to comment.