Skip to content

Commit 90549d1

Browse files
wangyumGitHub Enterprise
authored andcommitted
[HADP-56155] Support scala 2.13 (apache#670)
1 parent 1302775 commit 90549d1

File tree

59 files changed

+221
-138
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+221
-138
lines changed

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ abstract private[spark] class ContextCleanupWorker(sc: SparkContext, name: Strin
276276
@volatile private var stopped = false
277277

278278
private val cleaningThread = new Thread() {
279-
override def run() {
279+
override def run(): Unit = {
280280
keepCleaning()
281281
}
282282
}

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ private class ShuffleStatus(
276276
* Removes all shuffle outputs associated with this host. Note that this will also remove
277277
* outputs which are served by an external shuffle server (if one exists).
278278
*/
279-
def removeOutputsOnHost(host: String): Seq[Int] = withWriteLock {
279+
def removeOutputsOnHost(host: String): ArrayBuffer[Int] = withWriteLock {
280280
logDebug(s"Removing outputs for host ${host}")
281281
val removedMapIds = removeOutputsByFilter(x => x.host == host)
282282
removeMergeResultsByFilter(x => x.host == host)
@@ -288,7 +288,7 @@ private class ShuffleStatus(
288288
* remove outputs which are served by an external shuffle server (if one exists), as they are
289289
* still registered with that execId.
290290
*/
291-
def removeOutputsOnExecutor(execId: String): Seq[Int] = withWriteLock {
291+
def removeOutputsOnExecutor(execId: String): ArrayBuffer[Int] = withWriteLock {
292292
logDebug(s"Removing outputs for execId ${execId}")
293293
removeOutputsByFilter(x => x.executorId == execId)
294294
}
@@ -297,7 +297,7 @@ private class ShuffleStatus(
297297
* Removes all shuffle outputs which satisfies the filter. Note that this will also
298298
* remove outputs which are served by an external shuffle server (if one exists).
299299
*/
300-
def removeOutputsByFilter(f: BlockManagerId => Boolean): Seq[Int] = withWriteLock {
300+
def removeOutputsByFilter(f: BlockManagerId => Boolean): ArrayBuffer[Int] = withWriteLock {
301301
val removedMapIds = ArrayBuffer[Int]()
302302
for (mapIndex <- mapStatuses.indices) {
303303
val currentMapStatus = mapStatuses(mapIndex)

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2462,7 +2462,7 @@ class SparkContext(config: SparkConf) extends Logging {
24622462
logInfo("Successfully stopped SparkContext")
24632463
}
24642464

2465-
def logStop(name: String)(stop: => Unit) {
2465+
def logStop(name: String)(stop: => Unit): Unit = {
24662466
logInfo(s"Stopping $name")
24672467
Utils.tryLogNonFatalError {
24682468
stop

core/src/main/scala/org/apache/spark/TaskContextImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,5 +292,5 @@ private[spark] class TaskContextImpl(
292292

293293
private[spark] override def fetchFailed: Option[FetchFailedException] = _fetchFailedException
294294

295-
private[spark] override def getLocalProperties(): Properties = localProperties
295+
private[spark] override def getLocalProperties: Properties = localProperties
296296
}

core/src/main/scala/org/apache/spark/deploy/history/ViewPointFsHistoryProvider.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ private[history] class ViewPointFsHistoryProvider(conf: SparkConf, clock: Clock)
219219
s"$logName, lastScanSize: $lastScanSize, currentSize: ${fileStatus.getLen()}")
220220
val task: Future[Unit] = replayExecutor.submit(new Runnable {
221221
override def run(): Unit = updateEventLog(fileStatus)
222-
}, Unit)
222+
}, ())
223223
tasks = tasks :+ task
224224
pendingReplayTasksCount.addAndGet(1)
225225
}
@@ -353,7 +353,7 @@ private[history] class ViewPointFsHistoryProvider(conf: SparkConf, clock: Clock)
353353
try {
354354
val task: Future[Unit] = replayExecutor.submit(new Runnable {
355355
override def run(): Unit = mergeApplicationListing(entry, newLastScanTime)
356-
}, Unit)
356+
}, ())
357357
Some(task -> entry.getPath)
358358
} catch {
359359
// let the iteration over the updated entries break, since an exception on
@@ -653,7 +653,7 @@ private[history] class ViewPointFsHistoryProvider(conf: SparkConf, clock: Clock)
653653
logInfo(s"Removed ${appId} from favorite list")
654654
}
655655
}
656-
}, Unit)
656+
}, ())
657657
}
658658

659659
override def getFavoriteSessions(

core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -563,7 +563,7 @@ object HadoopMapReduceCommitProtocol extends Logging {
563563
logInfo(s"Merging data from $from to $to")
564564
rename(fs, from, to)
565565
}
566-
}, Unit)
566+
}, ())
567567
}
568568

569569
@throws[IOException]

core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
9797
def inRange(k: K): Boolean = ordering.gteq(k, lower) && ordering.lteq(k, upper)
9898

9999
val rddToFilter: RDD[P] = self.partitioner match {
100-
case Some(rp: RangePartitioner[K, V]) =>
100+
case Some(rp: RangePartitioner[_, _]) =>
101101
val partitionIndices = (rp.getPartition(lower), rp.getPartition(upper)) match {
102102
case (l, u) => Math.min(l, u) to Math.max(l, u)
103103
}

core/src/main/scala/org/apache/spark/scheduler/AnalyticsTaskSchedulerImpl.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ private[spark] class AnalyticsTaskSchedulerImpl(
7979
private val executorStatusLock = new ReentrantReadWriteLock()
8080

8181
override def runningTasksByExecutors: Map[String, Int] = {
82-
executorIdToRunningTaskIds.toMap.mapValues(_.size)
82+
executorIdToRunningTaskIds.toMap.mapValues(_.size).toMap
8383
}
8484

8585
// The set of executors we have on each host; this is used to compute hostsAlive, which
@@ -552,7 +552,7 @@ private[spark] class AnalyticsTaskSchedulerImpl(
552552
if (tasks.nonEmpty) {
553553
hasLaunchedTask = true
554554
}
555-
tasks
555+
tasks.map(_.toSeq)
556556
} finally {
557557
executorStatusLock.readLock().unlock()
558558
}
@@ -930,7 +930,7 @@ private[spark] class AnalyticsTaskSchedulerImpl(
930930
abortTimer.cancel()
931931
}
932932

933-
def checkTaskSetHealth() {
933+
def checkTaskSetHealth(): Unit = {
934934
val taskSets = rootPool.getTaskSetQueue
935935
for (taskSet <- taskSets) {
936936
if (taskSet.isZombie) {

core/src/main/scala/org/apache/spark/scheduler/AnalyticsTaskSetManager.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io.NotSerializableException
2121
import java.nio.ByteBuffer
2222
import java.util.concurrent.ConcurrentLinkedQueue
2323

24-
import scala.collection.mutable.{ArrayBuffer, HashMap}
24+
import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap}
2525
import scala.math.max
2626
import scala.util.control.NonFatal
2727

@@ -162,7 +162,7 @@ private[spark] class AnalyticsTaskSetManager(
162162
index: Int,
163163
taskLocality: TaskLocality.Value,
164164
speculative: Boolean,
165-
availableResources: Map[String, Seq[String]] = Map.empty): TaskDescription = {
165+
availableResources: Map[String, Buffer[String]] = Map.empty): TaskDescription = {
166166
// Found a task; do some bookkeeping and return a task description
167167
val task = tasks(index)
168168
val taskId = sched.newTaskId()
@@ -443,7 +443,7 @@ private[spark] class AnalyticsTaskSetManager(
443443
}
444444
}
445445

446-
override def getResSufficientSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
446+
override def getResSufficientSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
447447
if (hasMoreComputeResources) {
448448
val sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]()
449449
sortedTaskSetQueue += this

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,7 @@ private[spark] class DAGScheduler(
402402
lightAccumUpdates += accum
403403
}
404404
)
405-
(heavyAccumUpdates, lightAccumUpdates, accumUpdatesForDAG)
405+
(heavyAccumUpdates.toSeq, lightAccumUpdates.toSeq, accumUpdatesForDAG.toSeq)
406406
}
407407

408408
def taskMetricsFromAccumulators(accumUpdates: Seq[AccumulatorV2[_, _]]): TaskMetrics = {

0 commit comments

Comments
 (0)