Skip to content

Commit 19797f9

Browse files
committed
clean up
1 parent 6ebceca commit 19797f9

File tree

5 files changed

+8
-11
lines changed

5 files changed

+8
-11
lines changed

python/pyspark/streaming/context.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,16 @@
1515
# limitations under the License.
1616
#
1717

18+
from py4j.java_collections import ListConverter
19+
from py4j.java_gateway import java_import
20+
1821
from pyspark import RDD
1922
from pyspark.serializers import UTF8Deserializer
2023
from pyspark.context import SparkContext
2124
from pyspark.storagelevel import StorageLevel
2225
from pyspark.streaming.dstream import DStream
2326
from pyspark.streaming.util import RDDFunction
2427

25-
from py4j.java_collections import ListConverter
26-
from py4j.java_gateway import java_import
27-
2828
__all__ = ["StreamingContext"]
2929

3030

python/pyspark/streaming/tests.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ def test_count_by_value_and_window(self):
348348
input = [range(1), range(2), range(3), range(4), range(5), range(6)]
349349

350350
def func(dstream):
351-
return dstream.countByValueAndWindow(6, 1)
351+
return dstream.countByValueAndWindow(5, 1)
352352

353353
expected = [[1], [2], [3], [4], [5], [6], [6], [6], [6], [6]]
354354
self._test_func(input, func, expected)
@@ -357,7 +357,7 @@ def test_group_by_key_and_window(self):
357357
input = [[('a', i)] for i in range(5)]
358358

359359
def func(dstream):
360-
return dstream.groupByKeyAndWindow(4, 1).mapValues(list)
360+
return dstream.groupByKeyAndWindow(3, 1).mapValues(list)
361361

362362
expected = [[('a', [0])], [('a', [0, 1])], [('a', [0, 1, 2])], [('a', [1, 2, 3])],
363363
[('a', [2, 3, 4])], [('a', [3, 4])], [('a', [4])]]

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,7 @@ class StreamingContext private[streaming] (
413413
dstreams: Seq[DStream[_]],
414414
transformFunc: (Seq[RDD[_]], Time) => RDD[T]
415415
): DStream[T] = {
416-
new TransformedDStream[T](dstreams, (transformFunc))
416+
new TransformedDStream[T](dstreams, transformFunc)
417417
}
418418

419419
/** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for

streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -549,10 +549,6 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
549549
* JavaStreamingContext object contains a number of utility functions.
550550
*/
551551
object JavaStreamingContext {
552-
implicit def fromStreamingContext(ssc: StreamingContext):
553-
JavaStreamingContext = new JavaStreamingContext(ssc)
554-
555-
implicit def toStreamingContext(jssc: JavaStreamingContext): StreamingContext = jssc.ssc
556552

557553
/**
558554
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ trait PythonRDDFunction {
4141
/**
4242
* Wrapper for PythonRDDFunction
4343
*/
44-
class RDDFunction(pfunc: PythonRDDFunction)
44+
private[python] class RDDFunction(pfunc: PythonRDDFunction)
4545
extends function.Function2[JList[JavaRDD[_]], Time, JavaRDD[Array[Byte]]] with Serializable {
4646

4747
def wrapRDD(rdd: Option[RDD[_]]): JavaRDD[_] = {
@@ -68,6 +68,7 @@ class RDDFunction(pfunc: PythonRDDFunction)
6868
some(pfunc.call(time.milliseconds, List(wrapRDD(rdd), wrapRDD(rdd2)).asJava))
6969
}
7070

71+
// for JFunction2
7172
def call(rdds: JList[JavaRDD[_]], time: Time): JavaRDD[Array[Byte]] = {
7273
pfunc.call(time.milliseconds, rdds)
7374
}

0 commit comments

Comments
 (0)