-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24386][SS] coalesce(1) aggregates in continuous processing #21560
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
1d6b718
b5d1008
af40769
46456dc
2ea8a6f
955ac79
8cefb72
f91bfe7
2590292
859e6e4
b23b7bb
97f7e8f
de21b1c
7dcf51a
ad0b5aa
c9adee5
63d38d8
331f437
f3ce675
e0108d7
024f92d
8f1939b
c99d952
aaac0af
80d60db
26b74f0
03cc20d
252f5c9
f666aa0
9b71e00
37320f7
fbe8c21
486f36b
c0f769e
71a3568
0b35766
468f134
f77b12b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -349,6 +349,17 @@ object UnsupportedOperationChecker { | |
| _: DeserializeToObject | _: SerializeFromObject | _: SubqueryAlias | | ||
| _: TypedFilter) => | ||
| case node if node.nodeName == "StreamingRelationV2" => | ||
| case Repartition(1, false, _) => | ||
| case node: Aggregate => | ||
| val aboveSinglePartitionCoalesce = node.find { | ||
| case Repartition(1, false, _) => true | ||
| case _ => false | ||
| }.isDefined | ||
|
|
||
| if (!aboveSinglePartitionCoalesce) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if there was only a single partition to begin with ? Then theres no need of Repartition(1) and this check should be skipped.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree that it wouldn't be needed, but partitioning information is not always available during analysis. So I don't think we can write the more granular check suggested here.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also if theres a single parent partition and theres a
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (same comment as above applies here - we don't have partitioning information in analysis) |
||
| throwError(s"In continuous processing mode, coalesce(1) must be called before " + | ||
| s"aggregate operation ${node.nodeName}.") | ||
| } | ||
| case node => | ||
| throwError(s"Continuous processing does not support ${node.nodeName} operations.") | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,11 +22,12 @@ import scala.collection.mutable | |
| import org.apache.spark.sql.{sources, Strategy} | ||
| import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression} | ||
| import org.apache.spark.sql.catalyst.planning.PhysicalOperation | ||
| import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
| import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition} | ||
| import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} | ||
| import org.apache.spark.sql.execution.datasources.DataSourceStrategy | ||
| import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec} | ||
| import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec} | ||
| import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns} | ||
| import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader | ||
|
|
||
| object DataSourceV2Strategy extends Strategy { | ||
|
|
||
|
|
@@ -141,6 +142,17 @@ object DataSourceV2Strategy extends Strategy { | |
| case WriteToContinuousDataSource(writer, query) => | ||
| WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil | ||
|
|
||
| case Repartition(1, false, child) => | ||
| val isContinuous = child.collectFirst { | ||
| case StreamingDataSourceV2Relation(_, _, _, r: ContinuousReader) => r | ||
| }.isDefined | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The judgement of whether the plan is continuous or not can be a sperated method and other place can use it? |
||
|
|
||
| if (isContinuous) { | ||
| ContinuousCoalesceExec(1, planLater(child)) :: Nil | ||
| } else { | ||
| Nil | ||
| } | ||
|
|
||
| case _ => Nil | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,51 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution.streaming.continuous | ||
|
|
||
| import java.util.UUID | ||
|
|
||
| import org.apache.spark.{HashPartitioner, SparkEnv} | ||
| import org.apache.spark.rdd.RDD | ||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow} | ||
| import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, SinglePartition} | ||
| import org.apache.spark.sql.execution.SparkPlan | ||
| import org.apache.spark.sql.execution.streaming.continuous.shuffle.{ContinuousShuffleReadPartition, ContinuousShuffleReadRDD} | ||
|
|
||
| /** | ||
| * Physical plan for coalescing a continuous processing plan. | ||
| * | ||
| * Currently, only coalesces to a single partition are supported. `numPartitions` must be 1. | ||
| */ | ||
| case class ContinuousCoalesceExec(numPartitions: Int, child: SparkPlan) extends SparkPlan { | ||
| override def output: Seq[Attribute] = child.output | ||
|
|
||
| override def children: Seq[SparkPlan] = child :: Nil | ||
|
|
||
| override def outputPartitioning: Partitioning = SinglePartition | ||
|
|
||
| override def doExecute(): RDD[InternalRow] = { | ||
| assert(numPartitions == 1) | ||
| new ContinuousCoalesceRDD( | ||
| sparkContext, | ||
| numPartitions, | ||
| conf.continuousStreamingExecutorQueueSize, | ||
| sparkContext.getLocalProperty(ContinuousExecution.EPOCH_INTERVAL_KEY).toLong, | ||
| child.execute()) | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,136 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution.streaming.continuous | ||
|
|
||
| import java.util.UUID | ||
|
|
||
| import org.apache.spark._ | ||
| import org.apache.spark.rdd.RDD | ||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.catalyst.expressions.UnsafeRow | ||
| import org.apache.spark.sql.execution.streaming.continuous.shuffle._ | ||
| import org.apache.spark.util.ThreadUtils | ||
|
|
||
| case class ContinuousCoalesceRDDPartition( | ||
| index: Int, | ||
| endpointName: String, | ||
| queueSize: Int, | ||
| numShuffleWriters: Int, | ||
| epochIntervalMs: Long) | ||
| extends Partition { | ||
| // Initialized only on the executor, and only once even as we call compute() multiple times. | ||
| lazy val (reader: ContinuousShuffleReader, endpoint) = { | ||
| val env = SparkEnv.get.rpcEnv | ||
| val receiver = new RPCContinuousShuffleReader( | ||
| queueSize, numShuffleWriters, epochIntervalMs, env) | ||
| val endpoint = env.setupEndpoint(endpointName, receiver) | ||
|
|
||
| TaskContext.get().addTaskCompletionListener { ctx => | ||
| env.stop(endpoint) | ||
| } | ||
| (receiver, endpoint) | ||
| } | ||
| // This flag will be flipped on the executors to indicate that the threads processing | ||
| // partitions of the write-side RDD have been started. These will run indefinitely | ||
| // asynchronously as epochs of the coalesce RDD complete on the read side. | ||
| private[continuous] var writersInitialized: Boolean = false | ||
| } | ||
|
|
||
| /** | ||
| * RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local | ||
| * continuous shuffle, and then reads them in the task thread using `reader`. | ||
| */ | ||
| class ContinuousCoalesceRDD( | ||
| context: SparkContext, | ||
| numPartitions: Int, | ||
| readerQueueSize: Int, | ||
| epochIntervalMs: Long, | ||
| prev: RDD[InternalRow]) | ||
| extends RDD[InternalRow](context, Nil) { | ||
|
|
||
| // When we support more than 1 target partition, we'll need to figure out how to pass in the | ||
| // required partitioner. | ||
| private val outputPartitioner = new HashPartitioner(1) | ||
|
|
||
| private val readerEndpointNames = (0 until numPartitions).map { i => | ||
| s"ContinuousCoalesceRDD-part$i-${UUID.randomUUID()}" | ||
| } | ||
|
|
||
| override def getPartitions: Array[Partition] = { | ||
| (0 until numPartitions).map { partIndex => | ||
| ContinuousCoalesceRDDPartition( | ||
| partIndex, | ||
| readerEndpointNames(partIndex), | ||
| readerQueueSize, | ||
| prev.getNumPartitions, | ||
| epochIntervalMs) | ||
| }.toArray | ||
| } | ||
|
|
||
| private lazy val threadPool = ThreadUtils.newDaemonFixedThreadPool( | ||
| prev.getNumPartitions, | ||
| this.name) | ||
|
|
||
| override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { | ||
| val part = split.asInstanceOf[ContinuousCoalesceRDDPartition] | ||
|
|
||
| if (!part.writersInitialized) { | ||
| val rpcEnv = SparkEnv.get.rpcEnv | ||
|
|
||
| // trigger lazy initialization | ||
| part.endpoint | ||
| val endpointRefs = readerEndpointNames.map { endpointName => | ||
| rpcEnv.setupEndpointRef(rpcEnv.address, endpointName) | ||
| } | ||
|
|
||
| val runnables = prev.partitions.map { prevSplit => | ||
| new Runnable() { | ||
| override def run(): Unit = { | ||
| TaskContext.setTaskContext(context) | ||
|
|
||
| val writer: ContinuousShuffleWriter = new RPCContinuousShuffleWriter( | ||
| prevSplit.index, outputPartitioner, endpointRefs.toArray) | ||
|
|
||
| EpochTracker.initializeCurrentEpoch( | ||
| context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong) | ||
| while (!context.isInterrupted() && !context.isCompleted()) { | ||
| writer.write(prev.compute(prevSplit, context).asInstanceOf[Iterator[UnsafeRow]]) | ||
| // Note that current epoch is a non-inheritable thread local, so each writer thread | ||
| // can properly increment its own epoch without affecting the main task thread. | ||
| EpochTracker.incrementCurrentEpoch() | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| context.addTaskCompletionListener { ctx => | ||
| threadPool.shutdownNow() | ||
| } | ||
|
|
||
| part.writersInitialized = true | ||
|
|
||
| runnables.foreach(threadPool.execute) | ||
| } | ||
|
|
||
| part.reader.read() | ||
| } | ||
|
|
||
| override def clearDependencies(): Unit = { | ||
| throw new IllegalStateException("Continuous RDDs cannot be checkpointed") | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,12 +21,14 @@ import java.util.UUID | |
|
|
||
| import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext} | ||
| import org.apache.spark.rdd.RDD | ||
| import org.apache.spark.rpc.RpcAddress | ||
| import org.apache.spark.sql.catalyst.expressions.UnsafeRow | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.util.NextIterator | ||
|
|
||
| case class ContinuousShuffleReadPartition( | ||
| index: Int, | ||
| endpointName: String, | ||
| queueSize: Int, | ||
| numShuffleWriters: Int, | ||
| epochIntervalMs: Long) | ||
|
|
@@ -36,7 +38,7 @@ case class ContinuousShuffleReadPartition( | |
| val env = SparkEnv.get.rpcEnv | ||
| val receiver = new RPCContinuousShuffleReader( | ||
| queueSize, numShuffleWriters, epochIntervalMs, env) | ||
| val endpoint = env.setupEndpoint(s"RPCContinuousShuffleReader-${UUID.randomUUID()}", receiver) | ||
| val endpoint = env.setupEndpoint(endpointName, receiver) | ||
|
|
||
| TaskContext.get().addTaskCompletionListener { ctx => | ||
| env.stop(endpoint) | ||
|
|
@@ -61,12 +63,14 @@ class ContinuousShuffleReadRDD( | |
| numPartitions: Int, | ||
| queueSize: Int = 1024, | ||
| numShuffleWriters: Int = 1, | ||
| epochIntervalMs: Long = 1000) | ||
| epochIntervalMs: Long = 1000, | ||
| val endpointNames: Seq[String] = Seq(s"RPCContinuousShuffleReader-${UUID.randomUUID()}")) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here: if possible it might be better to have complete code rather than just working with such assumption.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is just a default argument to make tests less wordy. I can remove it if you think that's best, but it doesn't impose a restriction. |
||
| extends RDD[UnsafeRow](sc, Nil) { | ||
|
|
||
| override protected def getPartitions: Array[Partition] = { | ||
| (0 until numPartitions).map { partIndex => | ||
| ContinuousShuffleReadPartition(partIndex, queueSize, numShuffleWriters, epochIntervalMs) | ||
| ContinuousShuffleReadPartition( | ||
| partIndex, endpointNames(partIndex), queueSize, numShuffleWriters, epochIntervalMs) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This effectively asserting numPartitions to be 1, otherwise it will throw exception. |
||
| }.toArray | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we have multiple repartitions which one meets the case and others are not? I'm not sure we are restricting repartition operations to be only once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there's any particular reason we need to. There's no reason we couldn't execute multiple repartitions if the optimizer isn't smart enough to combine them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh wait, I see what you mean. Repartition(5, ...) would never be matched by this rule, since it only applies to Aggregate.