Skip to content

Commit 42e5de4

Browse files
author
Andrew Or
committed
Merge branch 'master' of github.com:apache/spark into rest
2 parents 1f1c03f + 4a17122 commit 42e5de4

File tree

36 files changed

+2170
-261
lines changed

36 files changed

+2170
-261
lines changed

conf/metrics.properties.template

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
# period 10 Poll period
8888
# unit seconds Units of poll period
8989
# prefix EMPTY STRING Prefix to prepend to metric name
90+
# protocol tcp Protocol ("tcp" or "udp") to use
9091

9192
## Examples
9293
# Enable JmxSink for all instances by class name

core/pom.xml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -198,19 +198,19 @@
198198
<artifactId>stream</artifactId>
199199
</dependency>
200200
<dependency>
201-
<groupId>com.codahale.metrics</groupId>
201+
<groupId>io.dropwizard.metrics</groupId>
202202
<artifactId>metrics-core</artifactId>
203203
</dependency>
204204
<dependency>
205-
<groupId>com.codahale.metrics</groupId>
205+
<groupId>io.dropwizard.metrics</groupId>
206206
<artifactId>metrics-jvm</artifactId>
207207
</dependency>
208208
<dependency>
209-
<groupId>com.codahale.metrics</groupId>
209+
<groupId>io.dropwizard.metrics</groupId>
210210
<artifactId>metrics-json</artifactId>
211211
</dependency>
212212
<dependency>
213-
<groupId>com.codahale.metrics</groupId>
213+
<groupId>io.dropwizard.metrics</groupId>
214214
<artifactId>metrics-graphite</artifactId>
215215
</dependency>
216216
<dependency>

core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.util.Properties
2222
import java.util.concurrent.TimeUnit
2323

2424
import com.codahale.metrics.MetricRegistry
25-
import com.codahale.metrics.graphite.{Graphite, GraphiteReporter}
25+
import com.codahale.metrics.graphite.{GraphiteUDP, Graphite, GraphiteReporter}
2626

2727
import org.apache.spark.SecurityManager
2828
import org.apache.spark.metrics.MetricsSystem
@@ -38,6 +38,7 @@ private[spark] class GraphiteSink(val property: Properties, val registry: Metric
3838
val GRAPHITE_KEY_PERIOD = "period"
3939
val GRAPHITE_KEY_UNIT = "unit"
4040
val GRAPHITE_KEY_PREFIX = "prefix"
41+
val GRAPHITE_KEY_PROTOCOL = "protocol"
4142

4243
def propertyToOption(prop: String): Option[String] = Option(property.getProperty(prop))
4344

@@ -66,7 +67,11 @@ private[spark] class GraphiteSink(val property: Properties, val registry: Metric
6667

6768
MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
6869

69-
val graphite: Graphite = new Graphite(new InetSocketAddress(host, port))
70+
val graphite = propertyToOption(GRAPHITE_KEY_PROTOCOL).map(_.toLowerCase) match {
71+
case Some("udp") => new GraphiteUDP(new InetSocketAddress(host, port))
72+
case Some("tcp") | None => new Graphite(new InetSocketAddress(host, port))
73+
case Some(p) => throw new Exception(s"Invalid Graphite protocol: $p")
74+
}
7075

7176
val reporter: GraphiteReporter = GraphiteReporter.forRegistry(registry)
7277
.convertDurationsTo(TimeUnit.MILLISECONDS)

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -604,8 +604,8 @@ abstract class RDD[T: ClassTag](
604604
* print line function (like out.println()) as the 2nd parameter.
605605
* An example of pipe the RDD data of groupBy() in a streaming way,
606606
* instead of constructing a huge String to concat all the elements:
607-
* def printRDDElement(record:(String, Seq[String]), f:String=>Unit) =
608-
* for (e <- record._2){f(e)}
607+
* def printRDDElement(record:(String, Seq[String]), f:String=&gt;Unit) =
608+
* for (e &lt;- record._2){f(e)}
609609
* @param separateWorkingDir Use separate working directories for each task.
610610
* @return the result RDD
611611
*/
@@ -841,7 +841,7 @@ abstract class RDD[T: ClassTag](
841841
* Return an RDD with the elements from `this` that are not in `other`.
842842
*
843843
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
844-
* RDD will be <= us.
844+
* RDD will be &lt;= us.
845845
*/
846846
def subtract(other: RDD[T]): RDD[T] =
847847
subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.size)))
@@ -1027,7 +1027,7 @@ abstract class RDD[T: ClassTag](
10271027
*
10281028
* Note that this method should only be used if the resulting map is expected to be small, as
10291029
* the whole thing is loaded into the driver's memory.
1030-
* To handle very large results, consider using rdd.map(x => (x, 1L)).reduceByKey(_ + _), which
1030+
* To handle very large results, consider using rdd.map(x =&gt; (x, 1L)).reduceByKey(_ + _), which
10311031
* returns an RDD[T, Long] instead of a map.
10321032
*/
10331033
def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = {
@@ -1065,7 +1065,7 @@ abstract class RDD[T: ClassTag](
10651065
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
10661066
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
10671067
*
1068-
* The relative accuracy is approximately `1.054 / sqrt(2^p)`. Setting a nonzero `sp > p`
1068+
* The relative accuracy is approximately `1.054 / sqrt(2^p)`. Setting a nonzero `sp &gt; p`
10691069
* would trigger sparse representation of registers, which may reduce the memory consumption
10701070
* and increase accuracy when the cardinality is small.
10711071
*
@@ -1383,7 +1383,7 @@ abstract class RDD[T: ClassTag](
13831383

13841384
/**
13851385
* Private API for changing an RDD's ClassTag.
1386-
* Used for internal Java <-> Scala API compatibility.
1386+
* Used for internal Java-Scala API compatibility.
13871387
*/
13881388
private[spark] def retag(cls: Class[T]): RDD[T] = {
13891389
val classTag: ClassTag[T] = ClassTag.apply(cls)
@@ -1392,7 +1392,7 @@ abstract class RDD[T: ClassTag](
13921392

13931393
/**
13941394
* Private API for changing an RDD's ClassTag.
1395-
* Used for internal Java <-> Scala API compatibility.
1395+
* Used for internal Java-Scala API compatibility.
13961396
*/
13971397
private[spark] def retag(implicit classTag: ClassTag[T]): RDD[T] = {
13981398
this.mapPartitions(identity, preservesPartitioning = true)(classTag)

core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ import org.apache.spark.annotation.DeveloperApi
2727
import org.apache.spark.util.ByteBufferInputStream
2828
import org.apache.spark.util.Utils
2929

30-
private[spark] class JavaSerializationStream(out: OutputStream, counterReset: Int)
30+
private[spark] class JavaSerializationStream(
31+
out: OutputStream, counterReset: Int, extraDebugInfo: Boolean)
3132
extends SerializationStream {
3233
private val objOut = new ObjectOutputStream(out)
3334
private var counter = 0
@@ -39,7 +40,12 @@ private[spark] class JavaSerializationStream(out: OutputStream, counterReset: In
3940
* the stream 'resets' object class descriptions have to be re-written)
4041
*/
4142
def writeObject[T: ClassTag](t: T): SerializationStream = {
42-
objOut.writeObject(t)
43+
try {
44+
objOut.writeObject(t)
45+
} catch {
46+
case e: NotSerializableException if extraDebugInfo =>
47+
throw SerializationDebugger.improveException(t, e)
48+
}
4349
counter += 1
4450
if (counterReset > 0 && counter >= counterReset) {
4551
objOut.reset()
@@ -64,7 +70,8 @@ extends DeserializationStream {
6470
}
6571

6672

67-
private[spark] class JavaSerializerInstance(counterReset: Int, defaultClassLoader: ClassLoader)
73+
private[spark] class JavaSerializerInstance(
74+
counterReset: Int, extraDebugInfo: Boolean, defaultClassLoader: ClassLoader)
6875
extends SerializerInstance {
6976

7077
override def serialize[T: ClassTag](t: T): ByteBuffer = {
@@ -88,7 +95,7 @@ private[spark] class JavaSerializerInstance(counterReset: Int, defaultClassLoade
8895
}
8996

9097
override def serializeStream(s: OutputStream): SerializationStream = {
91-
new JavaSerializationStream(s, counterReset)
98+
new JavaSerializationStream(s, counterReset, extraDebugInfo)
9299
}
93100

94101
override def deserializeStream(s: InputStream): DeserializationStream = {
@@ -111,17 +118,20 @@ private[spark] class JavaSerializerInstance(counterReset: Int, defaultClassLoade
111118
@DeveloperApi
112119
class JavaSerializer(conf: SparkConf) extends Serializer with Externalizable {
113120
private var counterReset = conf.getInt("spark.serializer.objectStreamReset", 100)
121+
private var extraDebugInfo = conf.getBoolean("spark.serializer.extraDebugInfo", true)
114122

115123
override def newInstance(): SerializerInstance = {
116124
val classLoader = defaultClassLoader.getOrElse(Thread.currentThread.getContextClassLoader)
117-
new JavaSerializerInstance(counterReset, classLoader)
125+
new JavaSerializerInstance(counterReset, extraDebugInfo, classLoader)
118126
}
119127

120128
override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
121129
out.writeInt(counterReset)
130+
out.writeBoolean(extraDebugInfo)
122131
}
123132

124133
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
125134
counterReset = in.readInt()
135+
extraDebugInfo = in.readBoolean()
126136
}
127137
}

0 commit comments

Comments
 (0)