Skip to content

Commit c15162c

Browse files
committed
Reorganize StreamingContext implicit to improve API convenience
1 parent bf1a6aa commit c15162c

File tree

4 files changed

+54
-7
lines changed

4 files changed

+54
-7
lines changed

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicInteger
2222

2323
import scala.collection.Map
2424
import scala.collection.mutable.Queue
25-
import scala.language.implicitConversions
2625
import scala.reflect.ClassTag
2726

2827
import akka.actor.{Props, SupervisorStrategy}
@@ -523,9 +522,11 @@ object StreamingContext extends Logging {
523522

524523
private[streaming] val DEFAULT_CLEANER_TTL = 3600
525524

526-
implicit def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
525+
@deprecated("Replaced by implicit functions in the DStream companion object. This is " +
526+
"kept here only for backward compatibility.", "1.3.0")
527+
def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
527528
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = {
528-
new PairDStreamFunctions[K, V](stream)
529+
DStream.toPairDStreamFunctions(stream)(kt, vt, ord)
529530
}
530531

531532
/**

streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -815,6 +815,6 @@ object JavaPairDStream {
815815

816816
def scalaToJavaLong[K: ClassTag](dstream: JavaPairDStream[K, Long])
817817
: JavaPairDStream[K, JLong] = {
818-
StreamingContext.toPairDStreamFunctions(dstream.dstream).mapValues(new JLong(_))
818+
DStream.toPairDStreamFunctions(dstream.dstream).mapValues(new JLong(_))
819819
}
820820
}

streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ package org.apache.spark.streaming.dstream
2020

2121
import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
2222

23-
import scala.deprecated
2423
import scala.collection.mutable.HashMap
24+
import scala.language.implicitConversions
2525
import scala.reflect.ClassTag
2626
import scala.util.matching.Regex
2727

@@ -802,10 +802,21 @@ abstract class DStream[T: ClassTag] (
802802
}
803803
}
804804

805-
private[streaming] object DStream {
805+
object DStream {
806+
807+
// `toPairDStreamFunctions` was in SparkContext before 1.3 and users had to
808+
// `import StreamingContext._` to enable it. Now we move it here to make the compiler find
809+
// it automatically. However, we still keep the old function in StreamingContext for backward
810+
// compatibility and forward to the following function directly.
811+
812+
implicit def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
813+
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null):
814+
PairDStreamFunctions[K, V] = {
815+
new PairDStreamFunctions[K, V](stream)
816+
}
806817

807818
/** Get the creation site of a DStream from the stack trace of when the DStream is created. */
808-
def getCreationSite(): CallSite = {
819+
private[streaming] def getCreationSite(): CallSite = {
809820
val SPARK_CLASS_REGEX = """^org\.apache\.spark""".r
810821
val SPARK_STREAMING_TESTCLASS_REGEX = """^org\.apache\.spark\.streaming\.test""".r
811822
val SPARK_EXAMPLES_CLASS_REGEX = """^org\.apache\.spark\.examples""".r
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streamingtest
19+
20+
/**
21+
* A test suite to make sure all `implicit` functions work correctly.
22+
*
23+
* As `implicit` is a compiler feature, we don't need to run this class.
24+
* What we need to do is making the compiler happy.
25+
*/
26+
class ImplicitSuite {
27+
28+
// We only want to test if `implict` works well with the compiler, so we don't need a real DStream.
29+
def mockDStream[T]: org.apache.spark.streaming.dstream.DStream[T] = null
30+
31+
def testToPairDStreamFunctions(): Unit = {
32+
val rdd: org.apache.spark.streaming.dstream.DStream[(Int, Int)] = mockDStream
33+
rdd.groupByKey()
34+
}
35+
}

0 commit comments

Comments
 (0)