Skip to content

Commit e88351a

Browse files
committed
Merge remote-tracking branch 'upstream/master' into SPARK-29947
# Conflicts: # sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
2 parents 2e1f87d + e04309c commit e88351a

File tree

144 files changed

+2617
-1713
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

144 files changed

+2617
-1713
lines changed

core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,10 @@ <h4 class="title-table">Executors</h4>
7171
<table id="active-executors-table" class="table table-striped compact cell-border">
7272
<thead>
7373
<tr>
74-
<th>
75-
<span data-toggle="tooltip" data-placement="top" title="ID of the executor">Executor ID</span></th>
76-
<th>
77-
<span data-toggle="tooltip" data-placement="top" title="Address">Address</span></th>
78-
<th><span data-toggle="tooltip" data-placement="top" title="Status">Status</span></th>
79-
<th>
80-
<span data-toggle="tooltip" data-placement="top" title="RDD Blocks">RDD Blocks</span></th>
74+
<th>Executor ID</th>
75+
<th>Address</th>
76+
<th>Status</th>
77+
<th>RDD Blocks</th>
8178
<th>
8279
<span data-toggle="tooltip" data-placement="top"
8380
title="Memory used / total available memory for storage of data like RDD partitions cached in memory.">
@@ -90,13 +87,13 @@ <h4 class="title-table">Executors</h4>
9087
<span data-toggle="tooltip"
9188
title="Memory used / total available memory for off heap storage of data like RDD partitions cached in memory.">
9289
Off Heap Storage Memory</span></th>
93-
<th><span data-toggle="tooltip" data-placement="top" title="Disk Used">Disk Used</span></th>
94-
<th><span data-toggle="tooltip" data-placement="top" title="Cores">Cores</span></th>
95-
<th><span data-toggle="tooltip" data-placement="top" title="Resources">Resources</span></th>
90+
<th>Disk Used</th>
91+
<th>Cores</th>
92+
<th>Resources</th>
9693
<th><span data-toggle="tooltip" data-placement="top" title="Number of tasks currently executing. Darker shading highlights executors with more active tasks.">Active Tasks</span></th>
9794
<th><span data-toggle="tooltip" data-placement="top" title="Number of tasks that have failed on this executor. Darker shading highlights executors with a high proportion of failed tasks.">Failed Tasks</span></th>
98-
<th><span data-toggle="tooltip" data-placement="top" title="Complete Tasks">Complete Tasks</span></th>
99-
<th><span data-toggle="tooltip" data-placement="top" title="Total Tasks">Total Tasks</span></th>
95+
<th>Complete Tasks</th>
96+
<th>Total Tasks</th>
10097
<th>
10198
<scan data-toggle="tooltip" data-placement="top"
10299
title="Shaded red when garbage collection (GC) time is over 10% of task time">
@@ -113,8 +110,8 @@ <h4 class="title-table">Executors</h4>
113110
<span data-toggle="tooltip" data-placement="top"
114111
title="Bytes and records written to disk in order to be read by a shuffle in a future stage.">
115112
Shuffle Write</span></th>
116-
<th><span data-toggle="tooltip" data-placement="top" title="Logs">Logs</span></th>
117-
<th><span data-toggle="tooltip" data-placement="top" title="Thread Dump">Thread Dump</span></th>
113+
<th>Logs</th>
114+
<th>Thread Dump</th>
118115
</tr>
119116
</thead>
120117
<tbody>

core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ import org.apache.spark.internal.{config, Logging}
2727
import org.apache.spark.internal.config.Network
2828
import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
2929
import org.apache.spark.scheduler._
30+
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
31+
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
32+
import org.apache.spark.scheduler.local.LocalSchedulerBackend
3033
import org.apache.spark.storage.BlockManagerId
3134
import org.apache.spark.util._
3235

@@ -199,14 +202,30 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
199202
if (now - lastSeenMs > executorTimeoutMs) {
200203
logWarning(s"Removing executor $executorId with no recent heartbeats: " +
201204
s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms")
202-
scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " +
203-
s"timed out after ${now - lastSeenMs} ms"))
204-
// Asynchronously kill the executor to avoid blocking the current thread
205+
// Asynchronously kill the executor to avoid blocking the current thread
205206
killExecutorThread.submit(new Runnable {
206207
override def run(): Unit = Utils.tryLogNonFatalError {
207208
// Note: we want to get an executor back after expiring this one,
208209
// so do not simply call `sc.killExecutor` here (SPARK-8119)
209210
sc.killAndReplaceExecutor(executorId)
211+
// SPARK-27348: in case of the executors which are not gracefully shut down,
212+
// we should remove lost executors from CoarseGrainedSchedulerBackend manually
213+
// here to guarantee two things:
214+
// 1) explicitly remove executor information from CoarseGrainedSchedulerBackend for
215+
// a lost executor instead of waiting for disconnect message
216+
// 2) call scheduler.executorLost() underlying to fail any tasks assigned to
217+
// those executors to avoid app hang
218+
sc.schedulerBackend match {
219+
case backend: CoarseGrainedSchedulerBackend =>
220+
backend.driverEndpoint.send(RemoveExecutor(executorId,
221+
SlaveLost(s"Executor heartbeat timed out after ${now - lastSeenMs} ms")))
222+
223+
// LocalSchedulerBackend is used locally and only has one single executor
224+
case _: LocalSchedulerBackend =>
225+
226+
case other => throw new UnsupportedOperationException(
227+
s"Unknown scheduler backend: ${other.getClass}")
228+
}
210229
}
211230
})
212231
executorLastSeen.remove(executorId)

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,10 @@ private[spark] class EventLoggingListener(
255255
// ...
256256
// where jvmInformation, sparkProperties, etc. are sequence of tuples.
257257
// We go through the various of properties and redact sensitive information from them.
258-
val redactedProps = event.environmentDetails.map{ case (name, props) =>
259-
name -> Utils.redact(sparkConf, props)
258+
val noRedactProps = Seq("Classpath Entries")
259+
val redactedProps = event.environmentDetails.map {
260+
case (name, props) if noRedactProps.contains(name) => name -> props
261+
case (name, props) => name -> Utils.redact(sparkConf, props)
260262
}
261263
SparkListenerEnvironmentUpdate(redactedProps)
262264
}

core/src/main/scala/org/apache/spark/status/LiveEntity.scala

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.status
2020
import java.util.Date
2121
import java.util.concurrent.atomic.AtomicInteger
2222

23+
import scala.collection.JavaConverters._
2324
import scala.collection.immutable.{HashSet, TreeSet}
2425
import scala.collection.mutable.HashMap
2526

@@ -625,10 +626,22 @@ private class SchedulerPool(name: String) extends LiveEntity {
625626

626627
}
627628

628-
private object LiveEntityHelpers {
629+
private[spark] object LiveEntityHelpers {
629630

630631
private val stringInterner = Interners.newWeakInterner[String]()
631632

633+
private def accuValuetoString(value: Any): String = value match {
634+
case list: java.util.List[_] =>
635+
// SPARK-30379: For collection accumulator, string representation might
636+
// takes much more memory (e.g. long => string of it) and cause OOM.
637+
// So we only show first few elements.
638+
if (list.size() > 5) {
639+
list.asScala.take(5).mkString("[", ",", "," + "... " + (list.size() - 5) + " more items]")
640+
} else {
641+
list.toString
642+
}
643+
case _ => value.toString
644+
}
632645

633646
def newAccumulatorInfos(accums: Iterable[AccumulableInfo]): Seq[v1.AccumulableInfo] = {
634647
accums
@@ -641,8 +654,8 @@ private object LiveEntityHelpers {
641654
new v1.AccumulableInfo(
642655
acc.id,
643656
acc.name.map(weakIntern).orNull,
644-
acc.update.map(_.toString()),
645-
acc.value.map(_.toString()).orNull)
657+
acc.update.map(accuValuetoString),
658+
acc.value.map(accuValuetoString).orNull)
646659
}
647660
.toSeq
648661
}

core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import javax.ws.rs.core.{MediaType, Response, StreamingOutput}
2525
import scala.util.control.NonFatal
2626

2727
import org.apache.spark.{JobExecutionStatus, SparkContext}
28+
import org.apache.spark.status.api.v1
29+
import org.apache.spark.util.Utils
2830

2931
@Produces(Array(MediaType.APPLICATION_JSON))
3032
private[v1] class AbstractApplicationResource extends BaseAppResource {
@@ -97,7 +99,15 @@ private[v1] class AbstractApplicationResource extends BaseAppResource {
9799

98100
@GET
99101
@Path("environment")
100-
def environmentInfo(): ApplicationEnvironmentInfo = withUI(_.store.environmentInfo())
102+
def environmentInfo(): ApplicationEnvironmentInfo = withUI { ui =>
103+
val envInfo = ui.store.environmentInfo()
104+
new v1.ApplicationEnvironmentInfo(
105+
envInfo.runtime,
106+
Utils.redact(ui.conf, envInfo.sparkProperties),
107+
Utils.redact(ui.conf, envInfo.hadoopProperties),
108+
Utils.redact(ui.conf, envInfo.systemProperties),
109+
envInfo.classpathEntries)
110+
}
101111

102112
@GET
103113
@Path("logs")

core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import scala.concurrent.duration._
2626
import org.mockito.ArgumentMatchers.{any, eq => meq}
2727
import org.mockito.Mockito.{mock, spy, verify, when}
2828
import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester}
29+
import org.scalatest.concurrent.Eventually._
2930

3031
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
3132
import org.apache.spark.internal.config.DYN_ALLOCATION_TESTING
@@ -153,7 +154,6 @@ class HeartbeatReceiverSuite
153154
heartbeatReceiverClock.advance(executorTimeout)
154155
heartbeatReceiverRef.askSync[Boolean](ExpireDeadHosts)
155156
// Only the second executor should be expired as a dead host
156-
verify(scheduler).executorLost(meq(executorId2), any())
157157
val trackedExecutors = getTrackedExecutors
158158
assert(trackedExecutors.size === 1)
159159
assert(trackedExecutors.contains(executorId1))
@@ -209,6 +209,12 @@ class HeartbeatReceiverSuite
209209
// explicitly request new executors. For more detail, see SPARK-8119.
210210
assert(fakeClusterManager.getTargetNumExecutors === 2)
211211
assert(fakeClusterManager.getExecutorIdsToKill === Set(executorId1, executorId2))
212+
// [SPARK-27348] HeartbeatReceiver should remove lost executor from scheduler backend
213+
eventually(timeout(5.seconds)) {
214+
assert(!fakeSchedulerBackend.getExecutorIds().contains(executorId1))
215+
assert(!fakeSchedulerBackend.getExecutorIds().contains(executorId2))
216+
}
217+
fakeSchedulerBackend.stop()
212218
}
213219

214220
/** Manually send a heartbeat and return the response. */

core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@
1818
package org.apache.spark.deploy.master
1919

2020
import java.util.Date
21-
import java.util.concurrent.ConcurrentLinkedQueue
21+
import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch, TimeUnit}
2222
import java.util.concurrent.atomic.AtomicInteger
2323

2424
import scala.collection.JavaConverters._
2525
import scala.collection.mutable
26-
import scala.collection.mutable.{HashMap, HashSet}
26+
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
2727
import scala.concurrent.duration._
2828
import scala.io.Source
2929
import scala.reflect.ClassTag
@@ -97,13 +97,40 @@ class MockWorker(master: RpcEndpointRef, conf: SparkConf = new SparkConf) extend
9797
}
9898
}
9999

100-
class MockExecutorLaunchFailWorker(master: RpcEndpointRef, conf: SparkConf = new SparkConf)
101-
extends MockWorker(master, conf) {
100+
// This class is designed to handle the lifecycle of only one application.
101+
class MockExecutorLaunchFailWorker(master: Master, conf: SparkConf = new SparkConf)
102+
extends MockWorker(master.self, conf) with Eventually {
103+
104+
val appRegistered = new CountDownLatch(1)
105+
val launchExecutorReceived = new CountDownLatch(1)
106+
val appIdsToLaunchExecutor = new mutable.HashSet[String]
102107
var failedCnt = 0
108+
103109
override def receive: PartialFunction[Any, Unit] = {
110+
case LaunchDriver(driverId, _, _) =>
111+
master.self.send(RegisterApplication(appDesc, newDriver(driverId)))
112+
113+
// Below code doesn't make driver stuck, as newDriver opens another rpc endpoint for
114+
// handling driver related messages. To simplify logic, we will block handling
115+
// LaunchExecutor message until we validate registering app succeeds.
116+
eventually(timeout(5.seconds)) {
117+
// an app would be registered with Master once Driver set up
118+
assert(apps.nonEmpty)
119+
assert(master.idToApp.keySet.intersect(apps.keySet) == apps.keySet)
120+
}
121+
122+
appRegistered.countDown()
104123
case LaunchExecutor(_, appId, execId, _, _, _, _) =>
124+
assert(appRegistered.await(10, TimeUnit.SECONDS))
125+
126+
if (failedCnt == 0) {
127+
launchExecutorReceived.countDown()
128+
}
129+
assert(master.idToApp.contains(appId))
130+
appIdsToLaunchExecutor += appId
105131
failedCnt += 1
106-
master.send(ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None))
132+
master.self.send(ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None))
133+
107134
case otherMsg => super.receive(otherMsg)
108135
}
109136
}
@@ -662,7 +689,7 @@ class MasterSuite extends SparkFunSuite
662689
val master = makeAliveMaster()
663690
var worker: MockExecutorLaunchFailWorker = null
664691
try {
665-
worker = new MockExecutorLaunchFailWorker(master.self)
692+
worker = new MockExecutorLaunchFailWorker(master)
666693
worker.rpcEnv.setupEndpoint("worker", worker)
667694
val workerRegMsg = RegisterWorker(
668695
worker.id,
@@ -677,19 +704,16 @@ class MasterSuite extends SparkFunSuite
677704
val driver = DeployTestUtils.createDriverDesc()
678705
// mimic DriverClient to send RequestSubmitDriver to master
679706
master.self.askSync[SubmitDriverResponse](RequestSubmitDriver(driver))
680-
var appId: String = null
681-
eventually(timeout(10.seconds)) {
682-
// an app would be registered with Master once Driver set up
683-
assert(worker.apps.nonEmpty)
684-
appId = worker.apps.head._1
685-
assert(master.idToApp.contains(appId))
686-
}
707+
708+
// LaunchExecutor message should have been received in worker side
709+
assert(worker.launchExecutorReceived.await(10, TimeUnit.SECONDS))
687710

688711
eventually(timeout(10.seconds)) {
712+
val appIds = worker.appIdsToLaunchExecutor
689713
// Master would continually launch executors until reach MAX_EXECUTOR_RETRIES
690714
assert(worker.failedCnt == master.conf.get(MAX_EXECUTOR_RETRIES))
691715
// Master would remove the app if no executor could be launched for it
692-
assert(!master.idToApp.contains(appId))
716+
assert(master.idToApp.keySet.intersect(appIds).isEmpty)
693717
}
694718
} finally {
695719
if (worker != null) {

core/src/test/scala/org/apache/spark/status/LiveEntitySuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@
1717

1818
package org.apache.spark.status
1919

20+
import java.util.Arrays
21+
2022
import org.apache.spark.SparkFunSuite
2123
import org.apache.spark.storage.StorageLevel
24+
import org.apache.spark.util.{AccumulatorMetadata, CollectionAccumulator}
2225

2326
class LiveEntitySuite extends SparkFunSuite {
2427

@@ -52,6 +55,17 @@ class LiveEntitySuite extends SparkFunSuite {
5255
assert(!seq.exists(_.blockName == items(5).blockName))
5356
}
5457

58+
test("Only show few elements of CollectionAccumulator when converting to v1.AccumulableInfo") {
59+
val acc = new CollectionAccumulator[Int]()
60+
val value = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
61+
acc.setValue(value)
62+
acc.metadata = AccumulatorMetadata(0L, None, false)
63+
val accuInfo = LiveEntityHelpers
64+
.newAccumulatorInfos(Seq(acc.toInfo(Some(acc.value), Some(acc.value))))(0)
65+
assert(accuInfo.update.get == "[1,2,3,4,5,... 5 more items]")
66+
assert(accuInfo.value == "[1,2,3,4,5,... 5 more items]")
67+
}
68+
5569
private def checkSize(seq: Seq[_], expected: Int): Unit = {
5670
assert(seq.length === expected)
5771
var count = 0

docs/_data/menu-sql.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,8 @@
172172
url: sql-ref-syntax-aux-cache-clear-cache.html
173173
- text: REFRESH TABLE
174174
url: sql-ref-syntax-aux-refresh-table.html
175+
- text: REFRESH
176+
url: sql-ref-syntax-aux-cache-refresh.md
175177
- text: Describe Commands
176178
url: sql-ref-syntax-aux-describe.html
177179
subitems:

docs/sql-data-sources-avro.md

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -198,9 +198,22 @@ Data source options of Avro can be set via:
198198
<tr>
199199
<td><code>avroSchema</code></td>
200200
<td>None</td>
201-
<td>Optional Avro schema provided by a user in JSON format. The data type and naming of record fields
202-
should match the Avro data type when reading from Avro or match the Spark's internal data type (e.g., StringType, IntegerType) when writing to Avro files; otherwise, the read/write action will fail.</td>
203-
<td>read and write</td>
201+
<td>Optional schema provided by a user in JSON format.
202+
<ul>
203+
<li>
204+
When reading Avro, this option can be set to an evolved schema, which is compatible but different with
205+
the actual Avro schema. The deserialization schema will be consistent with the evolved schema.
206+
For example, if we set an evolved schema containing one additional column with a default value,
207+
the reading result in Spark will contain the new column too.
208+
</li>
209+
<li>
210+
When writing Avro, this option can be set if the expected output Avro schema doesn't match the
211+
schema converted by Spark. For example, the expected schema of one column is of "enum" type,
212+
instead of "string" type in the default converted schema.
213+
</li>
214+
</ul>
215+
</td>
216+
<td> read, write and function <code>from_avro</code></td>
204217
</tr>
205218
<tr>
206219
<td><code>recordName</code></td>
@@ -240,15 +253,6 @@ Data source options of Avro can be set via:
240253
</td>
241254
<td>function <code>from_avro</code></td>
242255
</tr>
243-
<tr>
244-
<td><code>actualSchema</code></td>
245-
<td>None</td>
246-
<td>Optional Avro schema (in JSON format) that was used to serialize the data. This should be set if the schema provided
247-
for deserialization is compatible with - but not the same as - the one used to originally convert the data to Avro.
248-
For more information on Avro's schema evolution and compatibility, please refer to the [documentation of Confluent](https://docs.confluent.io/current/schema-registry/avro.html).
249-
</td>
250-
<td>function <code>from_avro</code></td>
251-
</tr>
252256
</table>
253257

254258
## Configuration

docs/sql-migration-guide.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ license: |
2323
{:toc}
2424

2525
## Upgrading from Spark SQL 2.4 to 3.0
26+
- Since Spark 3.0, the permanent function created using resource throws `AnalysisException` if the resource does not exists.
27+
2628
- Since Spark 3.0, when inserting a value into a table column with a different data type, the type coercion is performed as per ANSI SQL standard. Certain unreasonable type conversions such as converting `string` to `int` and `double` to `boolean` are disallowed. A runtime exception will be thrown if the value is out-of-range for the data type of the column. In Spark version 2.4 and earlier, type conversions during table insertion are allowed as long as they are valid `Cast`. When inserting an out-of-range value to a integral field, the low-order bits of the value is inserted(the same as Java/Scala numeric type casting). For example, if 257 is inserted to a field of byte type, the result is 1. The behavior is controlled by the option `spark.sql.storeAssignmentPolicy`, with a default value as "ANSI". Setting the option as "Legacy" restores the previous behavior.
2729

2830
- In Spark 3.0, the deprecated methods `SQLContext.createExternalTable` and `SparkSession.createExternalTable` have been removed in favor of its replacement, `createTable`.
@@ -93,7 +95,7 @@ license: |
9395

9496
- Since Spark 3.0, if `hive.default.fileformat` is not found in `Spark SQL configuration` then it will fallback to hive-site.xml present in the `Hadoop configuration` of `SparkContext`.
9597

96-
- Since Spark 3.0, Spark will cast `String` to `Date/TimeStamp` in binary comparisons with dates/timestamps. The previous behaviour of casting `Date/Timestamp` to `String` can be restored by setting `spark.sql.legacy.typeCoercion.datetimeToString` to `true`.
98+
- Since Spark 3.0, Spark will cast `String` to `Date/TimeStamp` in binary comparisons with dates/timestamps. The previous behaviour of casting `Date/Timestamp` to `String` can be restored by setting `spark.sql.legacy.typeCoercion.datetimeToString.enabled` to `true`.
9799

98100
- Since Spark 3.0, when Avro files are written with user provided schema, the fields will be matched by field names between catalyst schema and avro schema instead of positions.
99101

0 commit comments

Comments
 (0)