-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
《4.1 Executor 端长时容错详解.md》讨论区 #11
Comments
您好,我想问一下,如果要是想catch spark Exception,应该怎么catch
这个应该怎么catch才能让application不退出呢 |
您好,我想问下,文章中关于 细粒度忽略 的修改原生Spark Streaming的代码在哪里呀 |
确实第一张图不对。你找的这张图(Spark 1.2 版本)结构上是正确的,除了 Spark 1.2 以来有些类的名字对不上了需要修改下 —— 稍后我把图给更新一下。Thanks for bringing this up! |
@lw-lin |
主要是修改 BlockRDD.scala,修改后整个文件源码如下: /*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.rdd
import scala.reflect.ClassTag
import org.apache.spark._
import org.apache.spark.storage.{BlockId, BlockManager, StreamBlockId}
import scala.Some
private[spark] class BlockRDDPartition(val blockId: BlockId, idx: Int) extends Partition {
val index = idx
}
private[spark]
class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[BlockId])
extends RDD[T](sc, Nil) {
@transient lazy val _locations = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
@volatile private var _isValid = true
override def getPartitions: Array[Partition] = {
assertValid()
(0 until blockIds.length).map { i =>
new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition]
}.toArray
}
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
assertValid()
val blockManager = SparkEnv.get.blockManager
val blockId = split.asInstanceOf[BlockRDDPartition].blockId
//=========================modification begin ======================
val isStreamBlock = blockId.isInstanceOf[StreamBlockId]
val enableBlockRddMissingIgnoreFeature =
SparkEnv.get.conf.getBoolean("spark._.enableBlockRddMissingIgnoreFeature" , false)
val defaultTaskRetryNum =
SparkEnv.get.conf.getInt("spark.task.maxFailures" , 4)
val maxRetryNum =
SparkEnv.get.conf.getInt("spark._.blockRddMissingMaxRetryNum" , 4).min(defaultTaskRetryNum)
val attemptTime = context.attemptNumber()
val shouldIgnore = attemptTime >= maxRetryNum - 1
//=========================modification end ======================
blockManager.get(blockId) match {
case Some(block) => block.data.asInstanceOf[Iterator[T]]
case None =>
//=========================modification begin ======================
if (enableBlockRddMissingIgnoreFeature && isStreamBlock && shouldIgnore) {
logError("block missing : " + blockId)
Seq().iterator
} else {
throw new Exception(s"Could not compute split, block $blockId of RDD $id not found")
}
//=========================modification end ======================
}
}
override def getPreferredLocations(split: Partition): Seq[String] = {
assertValid()
_locations(split.asInstanceOf[BlockRDDPartition].blockId)
}
/**
* Remove the data blocks that this BlockRDD is made from. NOTE: This is an
* irreversible operation, as the data in the blocks cannot be recovered back
* once removed. Use it with caution.
*/
private[spark] def removeBlocks() {
blockIds.foreach { blockId =>
sparkContext.env.blockManager.master.removeBlock(blockId)
}
_isValid = false
}
/**
* Whether this BlockRDD is actually usable. This will be false if the data blocks have been
* removed using `this.removeBlocks`.
*/
private[spark] def isValid: Boolean = {
_isValid
}
/** Check if this BlockRDD is valid. If not valid, exception is thrown. */
private[spark] def assertValid() {
if (!isValid) {
throw new SparkException(
"Attempted to use %s after its blocks have been removed!".format(toString))
}
}
protected def getBlockIdLocations(): Map[BlockId, Seq[String]] = {
_locations
}
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
这里是 《4.1 Executor 端长时容错详解.md》 讨论区。
如需要贴代码,请复制以下内容并修改:
谢谢!
The text was updated successfully, but these errors were encountered: