22
22
from pyspark .streaming .dstream import DStream
23
23
24
24
from py4j .java_collections import ListConverter
25
+ from py4j .java_gateway import java_import
25
26
26
27
__all__ = ["StreamingContext" ]
27
28
@@ -72,7 +73,7 @@ def __init__(self, sparkContext, duration):
72
73
should be set, either through the named parameters here or through C{conf}.
73
74
74
75
@param sparkContext: L{SparkContext} object.
75
- @param duration: A L{Duration} object or seconds for SparkStreaming.
76
+ @param duration: seconds for SparkStreaming.
76
77
77
78
"""
78
79
self ._sc = sparkContext
@@ -89,6 +90,9 @@ def _start_callback_server(self):
89
90
gw ._python_proxy_port = gw ._callback_server .port # update port with real port
90
91
91
92
def _initialize_context (self , sc , duration ):
93
+ java_import (self ._jvm , "org.apache.spark.streaming.*" )
94
+ java_import (self ._jvm , "org.apache.spark.streaming.api.java.*" )
95
+ java_import (self ._jvm , "org.apache.spark.streaming.api.python.*" )
92
96
return self ._jvm .JavaStreamingContext (sc ._jsc , self ._jduration (duration ))
93
97
94
98
def _jduration (self , seconds ):
@@ -217,7 +221,6 @@ def union(self, *dstreams):
217
221
raise ValueError ("should have at least one DStream to union" )
218
222
if len (dstreams ) == 1 :
219
223
return dstreams [0 ]
220
- self ._check_serialzers (dstreams )
221
224
first = dstreams [0 ]
222
225
jrest = ListConverter ().convert ([d ._jdstream for d in dstreams [1 :]],
223
226
SparkContext ._gateway ._gateway_client )
0 commit comments