Skip to content

Commit

Permalink
[SPARK-4080] Only throw IOException from [write|read][Object|External]
Browse files Browse the repository at this point in the history
If classes implementing Serializable or Externalizable interfaces throw
exceptions other than IOException or ClassNotFoundException from their
(de)serialization methods, then this results in an unhelpful
"IOException: unexpected exception type" rather than the actual exception that
produced the (de)serialization error.

This patch fixes this by adding a utility method that re-wraps any uncaught
exceptions in IOException (unless they are already instances of IOException).

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#2932 from JoshRosen/SPARK-4080 and squashes the following commits:

cd3a9be [Josh Rosen] [SPARK-4080] Only throw IOException from [write|read][Object|External].

(cherry picked from commit 6c98c29)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>

Conflicts:
	core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
	core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
	streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
  • Loading branch information
JoshRosen committed Oct 24, 2014
1 parent 59297e9 commit 6c10c27
Show file tree
Hide file tree
Showing 29 changed files with 78 additions and 47 deletions.
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.mutable.Map
import scala.reflect.ClassTag

import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.util.Utils

/**
* A data type that can be accumulated, ie has an commutative and associative "add" operation,
Expand Down Expand Up @@ -126,7 +127,7 @@ class Accumulable[R, T] (
}

// Called by Java when deserializing an object
private def readObject(in: ObjectInputStream) {
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
value_ = zero
deserialized = true
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
}

@throws(classOf[IOException])
private def writeObject(out: ObjectOutputStream) {
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
val sfactory = SparkEnv.get.serializer
sfactory match {
case js: JavaSerializer => out.defaultWriteObject()
Expand All @@ -222,7 +222,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
}

@throws(classOf[IOException])
private def readObject(in: ObjectInputStream) {
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
val sfactory = SparkEnv.get.serializer
sfactory match {
case js: JavaSerializer => in.defaultReadObject()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,19 @@ import org.apache.hadoop.io.ObjectWritable
import org.apache.hadoop.io.Writable

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils

@DeveloperApi
class SerializableWritable[T <: Writable](@transient var t: T) extends Serializable {
def value = t
override def toString = t.toString

private def writeObject(out: ObjectOutputStream) {
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
out.defaultWriteObject()
new ObjectWritable(t).write(out)
}

private def readObject(in: ObjectInputStream) {
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
val ow = new ObjectWritable()
ow.setConf(new Configuration())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ private[spark] class HttpBroadcast[T: ClassTag](
}

/** Used by the JVM when serializing this object. */
private def writeObject(out: ObjectOutputStream) {
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
assertValid()
out.defaultWriteObject()
}

/** Used by the JVM when deserializing this object. */
private def readObject(in: ObjectInputStream) {
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
HttpBroadcast.synchronized {
SparkEnv.get.blockManager.getSingle(blockId) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.util.Random
import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
import org.apache.spark.util.ByteBufferInputStream
import org.apache.spark.util.{ByteBufferInputStream, Utils}

/**
* A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]].
Expand Down Expand Up @@ -146,13 +146,13 @@ private[spark] class TorrentBroadcast[T: ClassTag](
}

/** Used by the JVM when serializing this object. */
private def writeObject(out: ObjectOutputStream) {
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
assertValid()
out.defaultWriteObject()
}

/** Used by the JVM when deserializing this object. */
private def readObject(in: ObjectInputStream) {
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.getLocal(broadcastId).map(_.data.next()) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.mutable.ArrayBuffer
import akka.actor.ActorRef

import org.apache.spark.deploy.ApplicationDescription
import org.apache.spark.util.Utils

private[spark] class ApplicationInfo(
val startTime: Long,
Expand All @@ -46,7 +47,7 @@ private[spark] class ApplicationInfo(

init()

private def readObject(in: java.io.ObjectInputStream): Unit = {
private def readObject(in: java.io.ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
init()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.deploy.master
import java.util.Date

import org.apache.spark.deploy.DriverDescription
import org.apache.spark.util.Utils

private[spark] class DriverInfo(
val startTime: Long,
Expand All @@ -36,7 +37,7 @@ private[spark] class DriverInfo(

init()

private def readObject(in: java.io.ObjectInputStream): Unit = {
private def readObject(in: java.io.ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
init()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[spark] class WorkerInfo(
def coresFree: Int = cores - coresUsed
def memoryFree: Int = memory - memoryUsed

private def readObject(in: java.io.ObjectInputStream) : Unit = {
private def readObject(in: java.io.ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
init()
}
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.io.{IOException, ObjectOutputStream}
import scala.reflect.ClassTag

import org.apache.spark._
import org.apache.spark.util.Utils

private[spark]
class CartesianPartition(
Expand All @@ -36,7 +37,7 @@ class CartesianPartition(
override val index: Int = idx

@throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream) {
private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
// Update the reference to parent split at the time of task serialization
s1 = rdd1.partitions(s1Index)
s2 = rdd2.partitions(s2Index)
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv
import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap, CompactBuffer}
import org.apache.spark.util.Utils
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.ShuffleHandle

Expand All @@ -39,7 +40,7 @@ private[spark] case class NarrowCoGroupSplitDep(
) extends CoGroupSplitDep {

@throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream) {
private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
// Update the reference to parent split at the time of task serialization
split = rdd.partitions(splitIndex)
oos.defaultWriteObject()
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.language.existentials
import scala.reflect.ClassTag

import org.apache.spark._
import org.apache.spark.util.Utils

/**
* Class that captures a coalesced RDD by essentially keeping track of parent partitions
Expand All @@ -42,7 +43,7 @@ private[spark] case class CoalescedRDDPartition(
var parents: Seq[Partition] = parentsIndices.map(rdd.partitions(_))

@throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream) {
private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
// Update the reference to parent partition at the time of task serialization
parents = parentsIndices.map(rdd.partitions(_))
oos.defaultWriteObject()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private[spark] class ParallelCollectionPartition[T: ClassTag](
override def index: Int = slice

@throws(classOf[IOException])
private def writeObject(out: ObjectOutputStream): Unit = {
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {

val sfactory = SparkEnv.get.serializer

Expand All @@ -67,7 +67,7 @@ private[spark] class ParallelCollectionPartition[T: ClassTag](
}

@throws(classOf[IOException])
private def readObject(in: ObjectInputStream): Unit = {
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {

val sfactory = SparkEnv.get.serializer
sfactory match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.io.{IOException, ObjectOutputStream}
import scala.reflect.ClassTag

import org.apache.spark.{OneToOneDependency, Partition, SparkContext, TaskContext}
import org.apache.spark.util.Utils

/**
* Class representing partitions of PartitionerAwareUnionRDD, which maintains the list of
Expand All @@ -38,7 +39,7 @@ class PartitionerAwareUnionRDDPartition(
override def hashCode(): Int = idx

@throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream) {
private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
// Update the reference to parent partition at the time of task serialization
parents = rdds.map(_.partitions(index)).toArray
oos.defaultWriteObject()
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.reflect.ClassTag

import org.apache.spark.{Dependency, Partition, RangeDependency, SparkContext, TaskContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils

/**
* Partition for UnionRDD.
Expand All @@ -48,7 +49,7 @@ private[spark] class UnionPartition[T: ClassTag](
override val index: Int = idx

@throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream) {
private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
// Update the reference to parent split at the time of task serialization
parentPartition = rdd.partitions(parentRddPartitionIndex)
oos.defaultWriteObject()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.io.{IOException, ObjectOutputStream}
import scala.reflect.ClassTag

import org.apache.spark.{OneToOneDependency, Partition, SparkContext, TaskContext}
import org.apache.spark.util.Utils

private[spark] class ZippedPartitionsPartition(
idx: Int,
Expand All @@ -34,7 +35,7 @@ private[spark] class ZippedPartitionsPartition(
def partitions = partitionValues

@throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream) {
private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
// Update the reference to parent split at the time of task serialization
partitionValues = rdds.map(rdd => rdd.partitions(idx))
oos.defaultWriteObject()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.scheduler
import java.io.{Externalizable, ObjectInput, ObjectOutput}

import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.Utils

/**
* Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the
Expand All @@ -31,7 +32,7 @@ private[spark] class MapStatus(var location: BlockManagerId, var compressedSizes

def this() = this(null, null) // For deserialization only

def writeExternal(out: ObjectOutput) {
override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
location.writeExternal(out)
out.writeInt(compressedSizes.length)
out.write(compressedSizes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long

def this() = this(null.asInstanceOf[ByteBuffer], null, null)

override def writeExternal(out: ObjectOutput) {
override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {

out.writeInt(valueBytes.remaining);
Utils.writeByteBuffer(valueBytes, out)
Expand All @@ -55,7 +55,7 @@ class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long
out.writeObject(metrics)
}

override def readExternal(in: ObjectInput) {
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {

val blen = in.readInt()
val byteVal = new Array[Byte](blen)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,11 @@ class JavaSerializer(conf: SparkConf) extends Serializer with Externalizable {
new JavaSerializerInstance(counterReset, classLoader)
}

override def writeExternal(out: ObjectOutput) {
override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
out.writeInt(counterReset)
}

override def readExternal(in: ObjectInput) {
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
counterReset = in.readInt()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ class BlockManagerId private (

def nettyPort: Int = nettyPort_

override def writeExternal(out: ObjectOutput) {
override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
out.writeUTF(executorId_)
out.writeUTF(host_)
out.writeInt(port_)
out.writeInt(nettyPort_)
}

override def readExternal(in: ObjectInput) {
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
executorId_ = in.readUTF()
host_ = in.readUTF()
port_ = in.readInt()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput}

import akka.actor.ActorRef

import org.apache.spark.util.Utils

private[spark] object BlockManagerMessages {
//////////////////////////////////////////////////////////////////////////////////
// Messages from the master to slaves.
Expand Down Expand Up @@ -65,7 +67,7 @@ private[spark] object BlockManagerMessages {

def this() = this(null, null, null, 0, 0, 0) // For deserialization only

override def writeExternal(out: ObjectOutput) {
override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
blockManagerId.writeExternal(out)
out.writeUTF(blockId.name)
storageLevel.writeExternal(out)
Expand All @@ -74,7 +76,7 @@ private[spark] object BlockManagerMessages {
out.writeLong(tachyonSize)
}

override def readExternal(in: ObjectInput) {
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
blockManagerId = BlockManagerId(in)
blockId = BlockId(in.readUTF())
storageLevel = StorageLevel(in)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.storage
import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -97,12 +98,12 @@ class StorageLevel private(
ret
}

override def writeExternal(out: ObjectOutput) {
override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
out.writeByte(toInt)
out.writeByte(_replication)
}

override def readExternal(in: ObjectInput) {
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
val flags = in.readByte()
_useDisk = (flags & 8) != 0
_useMemory = (flags & 4) != 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ private[spark]
class SerializableBuffer(@transient var buffer: ByteBuffer) extends Serializable {
def value = buffer

private def readObject(in: ObjectInputStream) {
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
val length = in.readInt()
buffer = ByteBuffer.allocate(length)
var amountRead = 0
Expand All @@ -44,7 +44,7 @@ class SerializableBuffer(@transient var buffer: ByteBuffer) extends Serializable
buffer.rewind() // Allow us to read it later
}

private def writeObject(out: ObjectOutputStream) {
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
out.writeInt(buffer.limit())
if (Channels.newChannel(out).write(buffer) != buffer.limit()) {
throw new IOException("Could not fully write buffer to output stream")
Expand Down
Loading

0 comments on commit 6c10c27

Please sign in to comment.