Skip to content

Commit 3d8858a

Browse files
author
madanadit
committed
Merge branch 'branch-2.3' into k8s-volumes
2 parents d9f46d3 + 6adf9ee commit 3d8858a

File tree

13 files changed

+309
-45
lines changed

13 files changed

+309
-45
lines changed

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ private[yarn] class YarnAllocator(
8181
private val releasedContainers = Collections.newSetFromMap[ContainerId](
8282
new ConcurrentHashMap[ContainerId, java.lang.Boolean])
8383

84-
private val numExecutorsRunning = new AtomicInteger(0)
84+
private val runningExecutors = Collections.newSetFromMap[String](
85+
new ConcurrentHashMap[String, java.lang.Boolean]())
8586

8687
private val numExecutorsStarting = new AtomicInteger(0)
8788

@@ -166,7 +167,7 @@ private[yarn] class YarnAllocator(
166167
clock = newClock
167168
}
168169

169-
def getNumExecutorsRunning: Int = numExecutorsRunning.get()
170+
def getNumExecutorsRunning: Int = runningExecutors.size()
170171

171172
def getNumExecutorsFailed: Int = synchronized {
172173
val endTime = clock.getTimeMillis()
@@ -242,12 +243,11 @@ private[yarn] class YarnAllocator(
242243
* Request that the ResourceManager release the container running the specified executor.
243244
*/
244245
def killExecutor(executorId: String): Unit = synchronized {
245-
if (executorIdToContainer.contains(executorId)) {
246-
val container = executorIdToContainer.get(executorId).get
247-
internalReleaseContainer(container)
248-
numExecutorsRunning.decrementAndGet()
249-
} else {
250-
logWarning(s"Attempted to kill unknown executor $executorId!")
246+
executorIdToContainer.get(executorId) match {
247+
case Some(container) if !releasedContainers.contains(container.getId) =>
248+
internalReleaseContainer(container)
249+
runningExecutors.remove(executorId)
250+
case _ => logWarning(s"Attempted to kill unknown executor $executorId!")
251251
}
252252
}
253253

@@ -274,7 +274,7 @@ private[yarn] class YarnAllocator(
274274
"Launching executor count: %d. Cluster resources: %s.")
275275
.format(
276276
allocatedContainers.size,
277-
numExecutorsRunning.get,
277+
runningExecutors.size,
278278
numExecutorsStarting.get,
279279
allocateResponse.getAvailableResources))
280280

@@ -286,7 +286,7 @@ private[yarn] class YarnAllocator(
286286
logDebug("Completed %d containers".format(completedContainers.size))
287287
processCompletedContainers(completedContainers.asScala)
288288
logDebug("Finished processing %d completed containers. Current running executor count: %d."
289-
.format(completedContainers.size, numExecutorsRunning.get))
289+
.format(completedContainers.size, runningExecutors.size))
290290
}
291291
}
292292

@@ -300,9 +300,9 @@ private[yarn] class YarnAllocator(
300300
val pendingAllocate = getPendingAllocate
301301
val numPendingAllocate = pendingAllocate.size
302302
val missing = targetNumExecutors - numPendingAllocate -
303-
numExecutorsStarting.get - numExecutorsRunning.get
303+
numExecutorsStarting.get - runningExecutors.size
304304
logDebug(s"Updating resource requests, target: $targetNumExecutors, " +
305-
s"pending: $numPendingAllocate, running: ${numExecutorsRunning.get}, " +
305+
s"pending: $numPendingAllocate, running: ${runningExecutors.size}, " +
306306
s"executorsStarting: ${numExecutorsStarting.get}")
307307

308308
if (missing > 0) {
@@ -502,7 +502,7 @@ private[yarn] class YarnAllocator(
502502
s"for executor with ID $executorId")
503503

504504
def updateInternalState(): Unit = synchronized {
505-
numExecutorsRunning.incrementAndGet()
505+
runningExecutors.add(executorId)
506506
numExecutorsStarting.decrementAndGet()
507507
executorIdToContainer(executorId) = container
508508
containerIdToExecutorId(container.getId) = executorId
@@ -513,7 +513,7 @@ private[yarn] class YarnAllocator(
513513
allocatedContainerToHostMap.put(containerId, executorHostname)
514514
}
515515

516-
if (numExecutorsRunning.get < targetNumExecutors) {
516+
if (runningExecutors.size() < targetNumExecutors) {
517517
numExecutorsStarting.incrementAndGet()
518518
if (launchContainers) {
519519
launcherPool.execute(new Runnable {
@@ -554,7 +554,7 @@ private[yarn] class YarnAllocator(
554554
} else {
555555
logInfo(("Skip launching executorRunnable as running executors count: %d " +
556556
"reached target executors count: %d.").format(
557-
numExecutorsRunning.get, targetNumExecutors))
557+
runningExecutors.size, targetNumExecutors))
558558
}
559559
}
560560
}
@@ -569,7 +569,11 @@ private[yarn] class YarnAllocator(
569569
val exitReason = if (!alreadyReleased) {
570570
// Decrement the number of executors running. The next iteration of
571571
// the ApplicationMaster's reporting thread will take care of allocating.
572-
numExecutorsRunning.decrementAndGet()
572+
containerIdToExecutorId.get(containerId) match {
573+
case Some(executorId) => runningExecutors.remove(executorId)
574+
case None => logWarning(s"Cannot find executorId for container: ${containerId.toString}")
575+
}
576+
573577
logInfo("Completed container %s%s (state: %s, exit status: %s)".format(
574578
containerId,
575579
onHostStr,

resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -251,11 +251,55 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
251251
ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Finished", 0)
252252
}
253253
handler.updateResourceRequests()
254-
handler.processCompletedContainers(statuses.toSeq)
254+
handler.processCompletedContainers(statuses)
255255
handler.getNumExecutorsRunning should be (0)
256256
handler.getPendingAllocate.size should be (1)
257257
}
258258

259+
test("kill same executor multiple times") {
260+
val handler = createAllocator(2)
261+
handler.updateResourceRequests()
262+
handler.getNumExecutorsRunning should be (0)
263+
handler.getPendingAllocate.size should be (2)
264+
265+
val container1 = createContainer("host1")
266+
val container2 = createContainer("host2")
267+
handler.handleAllocatedContainers(Array(container1, container2))
268+
handler.getNumExecutorsRunning should be (2)
269+
handler.getPendingAllocate.size should be (0)
270+
271+
val executorToKill = handler.executorIdToContainer.keys.head
272+
handler.killExecutor(executorToKill)
273+
handler.getNumExecutorsRunning should be (1)
274+
handler.killExecutor(executorToKill)
275+
handler.killExecutor(executorToKill)
276+
handler.killExecutor(executorToKill)
277+
handler.getNumExecutorsRunning should be (1)
278+
handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map.empty, Set.empty)
279+
handler.updateResourceRequests()
280+
handler.getPendingAllocate.size should be (1)
281+
}
282+
283+
test("process same completed container multiple times") {
284+
val handler = createAllocator(2)
285+
handler.updateResourceRequests()
286+
handler.getNumExecutorsRunning should be (0)
287+
handler.getPendingAllocate.size should be (2)
288+
289+
val container1 = createContainer("host1")
290+
val container2 = createContainer("host2")
291+
handler.handleAllocatedContainers(Array(container1, container2))
292+
handler.getNumExecutorsRunning should be (2)
293+
handler.getPendingAllocate.size should be (0)
294+
295+
val statuses = Seq(container1, container1, container2).map { c =>
296+
ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Finished", 0)
297+
}
298+
handler.processCompletedContainers(statuses)
299+
handler.getNumExecutorsRunning should be (0)
300+
301+
}
302+
259303
test("lost executor removed from backend") {
260304
val handler = createAllocator(4)
261305
handler.updateResourceRequests()
@@ -272,7 +316,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
272316
ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Failed", -1)
273317
}
274318
handler.updateResourceRequests()
275-
handler.processCompletedContainers(statuses.toSeq)
319+
handler.processCompletedContainers(statuses)
276320
handler.updateResourceRequests()
277321
handler.getNumExecutorsRunning should be (0)
278322
handler.getPendingAllocate.size should be (2)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717

1818
package org.apache.spark.sql.catalyst.optimizer
1919

20+
import org.apache.spark.sql.catalyst.analysis.CastSupport
2021
import org.apache.spark.sql.catalyst.expressions._
2122
import org.apache.spark.sql.catalyst.plans._
2223
import org.apache.spark.sql.catalyst.plans.logical._
2324
import org.apache.spark.sql.catalyst.rules._
25+
import org.apache.spark.sql.internal.SQLConf
2426

2527
/**
2628
* Collapse plans consisting empty local relations generated by [[PruneFilters]].
@@ -32,7 +34,7 @@ import org.apache.spark.sql.catalyst.rules._
3234
* - Aggregate with all empty children and at least one grouping expression.
3335
* - Generate(Explode) with all empty children. Others like Hive UDTF may return results.
3436
*/
35-
object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper {
37+
object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper with CastSupport {
3638
private def isEmptyLocalRelation(plan: LogicalPlan): Boolean = plan match {
3739
case p: LocalRelation => p.data.isEmpty
3840
case _ => false
@@ -43,7 +45,9 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper {
4345

4446
// Construct a project list from plan's output, while the value is always NULL.
4547
private def nullValueProjectList(plan: LogicalPlan): Seq[NamedExpression] =
46-
plan.output.map{ a => Alias(Literal(null), a.name)(a.exprId) }
48+
plan.output.map{ a => Alias(cast(Literal(null), a.dataType), a.name)(a.exprId) }
49+
50+
override def conf: SQLConf = SQLConf.get
4751

4852
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
4953
case p: Union if p.children.forall(isEmptyLocalRelation) =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.catalyst.plans
1919

2020
import org.apache.spark.sql.catalyst.expressions._
21-
import org.apache.spark.sql.catalyst.trees.TreeNode
21+
import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode}
2222
import org.apache.spark.sql.internal.SQLConf
2323
import org.apache.spark.sql.types.{DataType, StructType}
2424

@@ -103,7 +103,9 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
103103
var changed = false
104104

105105
@inline def transformExpression(e: Expression): Expression = {
106-
val newE = f(e)
106+
val newE = CurrentOrigin.withOrigin(e.origin) {
107+
f(e)
108+
}
107109
if (newE.fastEquals(e)) {
108110
e
109111
} else {

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.Literal
2525
import org.apache.spark.sql.catalyst.plans._
2626
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project}
2727
import org.apache.spark.sql.catalyst.rules.RuleExecutor
28-
import org.apache.spark.sql.types.StructType
28+
import org.apache.spark.sql.types.{IntegerType, StructType}
2929

3030
class PropagateEmptyRelationSuite extends PlanTest {
3131
object Optimize extends RuleExecutor[LogicalPlan] {
@@ -37,7 +37,8 @@ class PropagateEmptyRelationSuite extends PlanTest {
3737
ReplaceIntersectWithSemiJoin,
3838
PushDownPredicate,
3939
PruneFilters,
40-
PropagateEmptyRelation) :: Nil
40+
PropagateEmptyRelation,
41+
CollapseProject) :: Nil
4142
}
4243

4344
object OptimizeWithoutPropagateEmptyRelation extends RuleExecutor[LogicalPlan] {
@@ -48,7 +49,8 @@ class PropagateEmptyRelationSuite extends PlanTest {
4849
ReplaceExceptWithAntiJoin,
4950
ReplaceIntersectWithSemiJoin,
5051
PushDownPredicate,
51-
PruneFilters) :: Nil
52+
PruneFilters,
53+
CollapseProject) :: Nil
5254
}
5355

5456
val testRelation1 = LocalRelation.fromExternalRows(Seq('a.int), data = Seq(Row(1)))
@@ -79,18 +81,21 @@ class PropagateEmptyRelationSuite extends PlanTest {
7981

8082
(true, false, Inner, Some(LocalRelation('a.int, 'b.int))),
8183
(true, false, Cross, Some(LocalRelation('a.int, 'b.int))),
82-
(true, false, LeftOuter, Some(Project(Seq('a, Literal(null).as('b)), testRelation1).analyze)),
84+
(true, false, LeftOuter,
85+
Some(Project(Seq('a, Literal(null).cast(IntegerType).as('b)), testRelation1).analyze)),
8386
(true, false, RightOuter, Some(LocalRelation('a.int, 'b.int))),
84-
(true, false, FullOuter, Some(Project(Seq('a, Literal(null).as('b)), testRelation1).analyze)),
87+
(true, false, FullOuter,
88+
Some(Project(Seq('a, Literal(null).cast(IntegerType).as('b)), testRelation1).analyze)),
8589
(true, false, LeftAnti, Some(testRelation1)),
8690
(true, false, LeftSemi, Some(LocalRelation('a.int))),
8791

8892
(false, true, Inner, Some(LocalRelation('a.int, 'b.int))),
8993
(false, true, Cross, Some(LocalRelation('a.int, 'b.int))),
9094
(false, true, LeftOuter, Some(LocalRelation('a.int, 'b.int))),
9195
(false, true, RightOuter,
92-
Some(Project(Seq(Literal(null).as('a), 'b), testRelation2).analyze)),
93-
(false, true, FullOuter, Some(Project(Seq(Literal(null).as('a), 'b), testRelation2).analyze)),
96+
Some(Project(Seq(Literal(null).cast(IntegerType).as('a), 'b), testRelation2).analyze)),
97+
(false, true, FullOuter,
98+
Some(Project(Seq(Literal(null).cast(IntegerType).as('a), 'b), testRelation2).analyze)),
9499
(false, true, LeftAnti, Some(LocalRelation('a.int))),
95100
(false, true, LeftSemi, Some(LocalRelation('a.int))),
96101

@@ -209,4 +214,11 @@ class PropagateEmptyRelationSuite extends PlanTest {
209214

210215
comparePlans(optimized, correctAnswer)
211216
}
217+
218+
test("propagate empty relation keeps the plan resolved") {
219+
val query = testRelation1.join(
220+
LocalRelation('a.int, 'b.int), UsingJoin(FullOuter, "a" :: Nil), None)
221+
val optimized = Optimize.execute(query.analyze)
222+
assert(optimized.resolved)
223+
}
212224
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.plans
19+
20+
import org.apache.spark.SparkFunSuite
21+
import org.apache.spark.sql.catalyst.dsl.plans
22+
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Literal, NamedExpression}
23+
import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, Origin}
24+
import org.apache.spark.sql.types.IntegerType
25+
26+
class QueryPlanSuite extends SparkFunSuite {
27+
28+
test("origin remains the same after mapExpressions (SPARK-23823)") {
29+
CurrentOrigin.setPosition(0, 0)
30+
val column = AttributeReference("column", IntegerType)(NamedExpression.newExprId)
31+
val query = plans.DslLogicalPlan(plans.table("table")).select(column)
32+
CurrentOrigin.reset()
33+
34+
val mappedQuery = query mapExpressions {
35+
case _: Expression => Literal(1)
36+
}
37+
38+
val mappedOrigin = mappedQuery.expressions.apply(0).origin
39+
assert(mappedOrigin == Origin.apply(Some(0), Some(0)))
40+
}
41+
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources;
19+
20+
import org.apache.spark.annotation.InterfaceStability;
21+
22+
/**
23+
* Exception thrown when the parquet reader find column type mismatches.
24+
*/
25+
@InterfaceStability.Unstable
26+
public class SchemaColumnConvertNotSupportedException extends RuntimeException {
27+
28+
/**
29+
* Name of the column which cannot be converted.
30+
*/
31+
private String column;
32+
/**
33+
* Physical column type in the actual parquet file.
34+
*/
35+
private String physicalType;
36+
/**
37+
* Logical column type in the parquet schema the parquet reader use to parse all files.
38+
*/
39+
private String logicalType;
40+
41+
public String getColumn() {
42+
return column;
43+
}
44+
45+
public String getPhysicalType() {
46+
return physicalType;
47+
}
48+
49+
public String getLogicalType() {
50+
return logicalType;
51+
}
52+
53+
public SchemaColumnConvertNotSupportedException(
54+
String column,
55+
String physicalType,
56+
String logicalType) {
57+
super();
58+
this.column = column;
59+
this.physicalType = physicalType;
60+
this.logicalType = logicalType;
61+
}
62+
}

0 commit comments

Comments
 (0)