Skip to content

1.1 updated #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 308 commits into from
Sep 23, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
308 commits
Select commit Hold shift + click to select a range
3334169
fixed PEP-008 violation
giwa Aug 11, 2014
e8c7bfc
remove export PYSPARK_PYTHON in spark submit
giwa Aug 11, 2014
bdde697
removed unnesessary changes
giwa Aug 11, 2014
a65f302
edited the comment to add more precise description
giwa Aug 11, 2014
90a6484
added mapValues and flatMapVaules WIP for glom and mapPartitions test
giwa Aug 11, 2014
0704b86
WIP: solved partitioned and None is not recognized
giwa Aug 14, 2014
080541a
broke something
giwa Aug 14, 2014
2112638
all tests are passed if numSlice is 2 and the numver of each input is…
giwa Aug 15, 2014
536def4
basic function test cases are passed
giwa Aug 15, 2014
a14c7e1
modified streaming test case to add coment
giwa Aug 15, 2014
e3033fc
remove waste duplicated code
giwa Aug 15, 2014
89ae38a
added saveAsTextFiles and saveAsPickledFiles
giwa Aug 16, 2014
ea9c873
added TODO coments
giwa Aug 16, 2014
d8b593b
add comments
giwa Aug 18, 2014
e7ebb08
removed wasted print in DStream
giwa Aug 18, 2014
636090a
added sparkContext as input parameter in StreamingContext
giwa Aug 18, 2014
a3d2379
added gorupByKey testcase
giwa Aug 18, 2014
665bfdb
added testcase for combineByKey
giwa Aug 18, 2014
5c3a683
initial commit for pySparkStreaming
giwa Jul 9, 2014
e497b9b
comment PythonDStream.PairwiseDStream
Jul 15, 2014
6e0d9c7
modify dstream.py to fix indent error
Jul 16, 2014
9af03f4
added reducedByKey not working yet
Jul 16, 2014
dcf243f
implementing transform function in Python
Jul 16, 2014
c5518b4
modified the code base on comment in https://github.com/tdas/spark/pu…
Jul 16, 2014
3758175
add coment for hack why PYSPARK_PYTHON is needed in spark-submit
Jul 16, 2014
e551e13
add coment for hack why PYSPARK_PYTHON is needed in spark-submit
Jul 16, 2014
2adca84
remove not implemented DStream functions in python
Jul 16, 2014
5594bd4
revert pom.xml
Jul 16, 2014
490e338
sorted the import following Spark coding convention
Jul 16, 2014
856d98e
add empty line
Jul 16, 2014
4ce4058
remove unused import in python
Jul 16, 2014
02f618a
initial commit for socketTextStream
Jul 16, 2014
4b69fb1
fied input of socketTextDStream
Jul 16, 2014
57fb740
added doctest for pyspark.streaming.duration
Jul 17, 2014
967dc26
fixed typo of network_workdcount.py
Jul 17, 2014
7f7c5d1
delete old file
Jul 17, 2014
d25d5cf
added reducedByKey not working yet
Jul 16, 2014
0b8b7d0
reduceByKey is working
Jul 17, 2014
d1ee6ca
edit python sparkstreaming example
Jul 18, 2014
a9f4ecb
added count operation but this implementation need double check
Jul 19, 2014
05459c6
fix map function
Jul 20, 2014
9fa249b
clean up code
Jul 20, 2014
aeaf8a5
clean up codes
Jul 20, 2014
5e822d4
remove waste file
Jul 20, 2014
4eff053
Implemented DStream.foreachRDD in the Python API using Py4J callback …
tdas Jul 23, 2014
4caae3f
Added missing file
tdas Aug 1, 2014
c9fc124
Added extra line.
tdas Aug 1, 2014
19ddcdd
tried to restart callback server
Aug 2, 2014
b47b5fd
Kill py4j callback server properly
Aug 3, 2014
b6468e6
Removed the waste line
giwa Aug 3, 2014
b8d7d24
implemented reduce and count function in Dstream
giwa Aug 4, 2014
189dcea
clean up examples
giwa Aug 4, 2014
79c5809
added stop in StreamingContext
giwa Aug 4, 2014
5a9b525
clean up dstream.py
giwa Aug 4, 2014
ea4b06b
initial commit for testcase
giwa Aug 4, 2014
5d22c92
WIP
giwa Aug 4, 2014
c880a33
update comment
giwa Aug 4, 2014
1fd12ae
WIP
giwa Aug 4, 2014
c05922c
WIP: added PythonTestInputStream
giwa Aug 5, 2014
1f68b78
WIP
giwa Aug 7, 2014
3dda31a
WIP added test case
giwa Aug 11, 2014
7f96294
added basic operation test cases
giwa Aug 11, 2014
fa75d71
delete waste file
giwa Aug 11, 2014
8efa266
fixed PEP-008 violation
giwa Aug 11, 2014
3a671cc
remove export PYSPARK_PYTHON in spark submit
giwa Aug 11, 2014
774f18d
removed unnesessary changes
giwa Aug 11, 2014
33c0f94
edited the comment to add more precise description
giwa Aug 11, 2014
4f2d7e6
added mapValues and flatMapVaules WIP for glom and mapPartitions test
giwa Aug 11, 2014
9767712
WIP: solved partitioned and None is not recognized
giwa Aug 14, 2014
35933e1
broke something
giwa Aug 14, 2014
7051a84
all tests are passed if numSlice is 2 and the numver of each input is…
giwa Aug 15, 2014
99e4bb3
basic function test cases are passed
giwa Aug 15, 2014
580fbc2
modified streaming test case to add coment
giwa Aug 15, 2014
94f2b65
remove waste duplicated code
giwa Aug 15, 2014
e9fab72
added saveAsTextFiles and saveAsPickledFiles
giwa Aug 16, 2014
4aa99e4
added TODO coments
giwa Aug 16, 2014
6d8190a
add comments
giwa Aug 18, 2014
14d4c0e
removed wasted print in DStream
giwa Aug 18, 2014
97742fe
added sparkContext as input parameter in StreamingContext
giwa Aug 18, 2014
e162822
added gorupByKey testcase
giwa Aug 18, 2014
e70f706
added testcase for combineByKey
giwa Aug 18, 2014
f1798c4
merge with master
giwa Aug 18, 2014
185fdbf
merge with master
giwa Aug 19, 2014
199e37f
adopted the latest compression way of python command
giwa Aug 19, 2014
58150f5
Changed the test case to focus the test operation
giwa Aug 19, 2014
09a28bf
improve testcases
giwa Aug 19, 2014
268a6a5
Changed awaitTermination not to call awaitTermincation in Scala. Just…
giwa Aug 19, 2014
4dedd2d
change test case not to use awaitTermination
giwa Aug 19, 2014
171edeb
clean up
giwa Aug 20, 2014
f0ea311
clean up code
giwa Aug 21, 2014
1d84142
remove unimplement test
giwa Aug 21, 2014
583e66d
move tests for streaming inside streaming directory
giwa Aug 21, 2014
b7dab85
improve test case
giwa Aug 21, 2014
0d30109
fixed pep8 violation
giwa Aug 21, 2014
24f95db
clen up examples
giwa Aug 21, 2014
9c85e48
clean up exmples
giwa Aug 21, 2014
7339df2
fixed typo
giwa Aug 21, 2014
9d1de23
revert pom.xml
giwa Aug 21, 2014
4f82c89
remove duplicated import
giwa Aug 21, 2014
50fd6f9
revert pom.xml
giwa Aug 21, 2014
93f7637
fixed explanaiton
giwa Aug 21, 2014
acfcaeb
revert pom.xml
giwa Aug 21, 2014
3b27bd4
remove the last brank line
giwa Aug 21, 2014
2ea769e
added comment in dstream._test_output
giwa Aug 21, 2014
c97377c
delete inproper comments
giwa Aug 21, 2014
67473a9
delete not implemented functions
giwa Aug 21, 2014
d9d59fe
Fix scalastyle errors
Aug 26, 2014
1fd6bc7
Merge pull request #2 from mattf/giwa-master
giwa Aug 28, 2014
4afa390
clean up code
giwa Aug 31, 2014
d68b568
clean up code
giwa Aug 31, 2014
da09768
added StreamingContext.remember
giwa Aug 31, 2014
f5bfb70
added StreamingContext.sparkContext
giwa Aug 31, 2014
fdc9125
added comment for StreamingContext.sparkContext
giwa Aug 31, 2014
ee50c5a
added atexit to handle callback server
giwa Aug 31, 2014
f7bc8f9
WIP:added more test for StreamingContext
giwa Aug 31, 2014
150b94c
added some StreamingContextTestSuite
giwa Sep 1, 2014
454981d
initial commit for pySparkStreaming
giwa Jul 9, 2014
b406252
comment PythonDStream.PairwiseDStream
Jul 15, 2014
87438e2
modify dstream.py to fix indent error
Jul 16, 2014
d7b4d6f
added reducedByKey not working yet
Jul 16, 2014
1a0f065
implementing transform function in Python
Jul 16, 2014
17a74c6
modified the code base on comment in https://github.com/tdas/spark/pu…
Jul 16, 2014
494cae5
remove not implemented DStream functions in python
Jul 16, 2014
e1df940
revert pom.xml
Jul 16, 2014
5bac7ec
revert streaming/pom.xml
Jul 16, 2014
d2099d8
sorted the import following Spark coding convention
Jul 16, 2014
224fc5e
add empty line
Jul 16, 2014
bb7ccf3
remove unused import in python
Jul 16, 2014
f746109
initial commit for socketTextStream
Jul 16, 2014
0d1b954
fied input of socketTextDStream
Jul 16, 2014
ccfd214
added doctest for pyspark.streaming.duration
Jul 17, 2014
b31446a
fixed typo of network_workdcount.py
Jul 17, 2014
dc6995d
delete old file
Jul 17, 2014
c455c8d
added reducedByKey not working yet
Jul 16, 2014
6f98e50
reduceByKey is working
Jul 17, 2014
15feea9
edit python sparkstreaming example
Jul 18, 2014
d3ee86a
added count operation but this implementation need double check
Jul 19, 2014
72b9738
fix map function
Jul 20, 2014
bab31c1
clean up code
Jul 20, 2014
0a8bbbb
clean up codes
Jul 20, 2014
678e854
remove waste file
Jul 20, 2014
b1d2a30
Implemented DStream.foreachRDD in the Python API using Py4J callback …
tdas Jul 23, 2014
05e991b
Added missing file
tdas Aug 1, 2014
9ab8952
Added extra line.
tdas Aug 1, 2014
84a9668
tried to restart callback server
Aug 2, 2014
3b498e1
Kill py4j callback server properly
Aug 3, 2014
b349649
Removed the waste line
giwa Aug 3, 2014
3c45cd2
implemented reduce and count function in Dstream
giwa Aug 4, 2014
d2c01ba
clean up examples
giwa Aug 4, 2014
c462bb3
added stop in StreamingContext
giwa Aug 4, 2014
4d40d63
clean up dstream.py
giwa Aug 4, 2014
29c2bc5
initial commit for testcase
giwa Aug 4, 2014
fe648e3
WIP
giwa Aug 4, 2014
8a0fbbc
update comment
giwa Aug 4, 2014
1523b66
WIP
giwa Aug 4, 2014
1df77f5
WIP: added PythonTestInputStream
giwa Aug 5, 2014
9ad6855
WIP
giwa Aug 7, 2014
ce2acd2
WIP added test case
giwa Aug 11, 2014
878bad7
added basic operation test cases
giwa Aug 11, 2014
f21cab3
delete waste file
giwa Aug 11, 2014
3d37822
fixed PEP-008 violation
giwa Aug 11, 2014
253a863
removed unnesessary changes
giwa Aug 11, 2014
bb10956
edited the comment to add more precise description
giwa Aug 11, 2014
270a9e1
added mapValues and flatMapVaules WIP for glom and mapPartitions test
giwa Aug 11, 2014
bcdec33
WIP: solved partitioned and None is not recognized
giwa Aug 14, 2014
ff14070
broke something
giwa Aug 14, 2014
3000b2b
all tests are passed if numSlice is 2 and the numver of each input is…
giwa Aug 15, 2014
13fb44c
basic function test cases are passed
giwa Aug 15, 2014
18c8723
modified streaming test case to add coment
giwa Aug 15, 2014
f76c182
remove waste duplicated code
giwa Aug 15, 2014
74535d4
added saveAsTextFiles and saveAsPickledFiles
giwa Aug 16, 2014
16aa64f
added TODO coments
giwa Aug 16, 2014
e54f986
add comments
giwa Aug 18, 2014
10b5b04
removed wasted print in DStream
giwa Aug 18, 2014
10ab87b
added sparkContext as input parameter in StreamingContext
giwa Aug 18, 2014
5625bdc
added gorupByKey testcase
giwa Aug 18, 2014
c214199
added testcase for combineByKey
giwa Aug 18, 2014
0b99bec
initial commit for pySparkStreaming
giwa Jul 9, 2014
41886c2
comment PythonDStream.PairwiseDStream
Jul 15, 2014
66fcfff
modify dstream.py to fix indent error
Jul 16, 2014
38adf95
added reducedByKey not working yet
Jul 16, 2014
4bcb318
implementing transform function in Python
Jul 16, 2014
247fd74
modified the code base on comment in https://github.com/tdas/spark/pu…
Jul 16, 2014
dd6de81
initial commit for socketTextStream
Jul 16, 2014
f485b1d
fied input of socketTextDStream
Jul 16, 2014
0df7111
delete old file
Jul 17, 2014
58591d2
reduceByKey is working
Jul 17, 2014
98c2a00
added count operation but this implementation need double check
Jul 19, 2014
eb4bf48
fix map function
Jul 20, 2014
6197a11
clean up code
Jul 20, 2014
2ad7bd3
clean up codes
Jul 20, 2014
fe02547
remove waste file
Jul 20, 2014
4f07163
Implemented DStream.foreachRDD in the Python API using Py4J callback …
tdas Jul 23, 2014
54b5358
tried to restart callback server
Aug 2, 2014
88f7506
Kill py4j callback server properly
Aug 3, 2014
1b83354
Removed the waste line
giwa Aug 3, 2014
92e333e
implemented reduce and count function in Dstream
giwa Aug 4, 2014
0b09cff
added stop in StreamingContext
giwa Aug 4, 2014
932372a
clean up dstream.py
giwa Aug 4, 2014
376e3ac
WIP
giwa Aug 4, 2014
1934726
update comment
giwa Aug 4, 2014
019ef38
WIP
giwa Aug 4, 2014
5c04a5f
WIP: added PythonTestInputStream
giwa Aug 5, 2014
bd3ba53
WIP
giwa Aug 7, 2014
9cde7c9
WIP added test case
giwa Aug 11, 2014
b3b0362
added basic operation test cases
giwa Aug 11, 2014
99410be
delete waste file
giwa Aug 11, 2014
c1d546e
fixed PEP-008 violation
giwa Aug 11, 2014
af610d3
removed unnesessary changes
giwa Aug 11, 2014
953deb0
edited the comment to add more precise description
giwa Aug 11, 2014
f67cf57
added mapValues and flatMapVaules WIP for glom and mapPartitions test
giwa Aug 11, 2014
1e126bf
WIP: solved partitioned and None is not recognized
giwa Aug 14, 2014
795b2cd
broke something
giwa Aug 14, 2014
8dcda84
all tests are passed if numSlice is 2 and the numver of each input is…
giwa Aug 15, 2014
c5ecfc1
basic function test cases are passed
giwa Aug 15, 2014
2a06cdb
remove waste duplicated code
giwa Aug 15, 2014
99ce042
added saveAsTextFiles and saveAsPickledFiles
giwa Aug 16, 2014
ddd4ee1
added TODO coments
giwa Aug 16, 2014
af336b7
add comments
giwa Aug 18, 2014
455e5af
removed wasted print in DStream
giwa Aug 18, 2014
58e41ff
merge with master
giwa Aug 18, 2014
e80647e
adopted the latest compression way of python command
giwa Aug 19, 2014
c00e091
change test case not to use awaitTermination
giwa Aug 19, 2014
3166d31
clean up
giwa Aug 20, 2014
f198d14
clean up code
giwa Aug 21, 2014
b171ec3
fixed pep8 violation
giwa Aug 21, 2014
f04882c
clen up examples
giwa Aug 21, 2014
62dc7a3
clean up exmples
giwa Aug 21, 2014
7dc7391
fixed typo
giwa Aug 21, 2014
6ae3caa
revert pom.xml
giwa Aug 21, 2014
fa4af88
remove duplicated import
giwa Aug 21, 2014
066ba90
revert pom.xml
giwa Aug 21, 2014
8ed93af
fixed explanaiton
giwa Aug 21, 2014
fbed8da
revert pom.xml
giwa Aug 21, 2014
bebb3f3
remove the last brank line
giwa Aug 21, 2014
b0f2015
added comment in dstream._test_output
giwa Aug 21, 2014
f385976
delete inproper comments
giwa Aug 21, 2014
c0a06bc
delete not implemented functions
giwa Aug 21, 2014
2fdf0de
Fix scalastyle errors
Aug 26, 2014
d542743
clean up code
giwa Aug 31, 2014
d39f102
added StreamingContext.remember
giwa Aug 31, 2014
63c881a
added StreamingContext.sparkContext
giwa Aug 31, 2014
d5f5fcb
added comment for StreamingContext.sparkContext
giwa Aug 31, 2014
8ffdbf1
added atexit to handle callback server
giwa Aug 31, 2014
4a59e1e
WIP:added more test for StreamingContext
giwa Aug 31, 2014
2d32a74
added some StreamingContextTestSuite
giwa Sep 1, 2014
e685853
meged with rebased 1.1 branch
giwa Sep 20, 2014
5cdb6fa
changed for SCCallSiteSync
giwa Sep 21, 2014
550dfd9
WIP fixing 1.1 merge
giwa Sep 21, 2014
dc23b38
Merge remote-tracking branch 'upstream/master' into 1.1-updated
giwa Sep 23, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ private class PythonException(msg: String, cause: Exception) extends RuntimeExce
* Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python.
* This is used by PySpark's shuffle operations.
*/
private class PairwiseRDD(prev: RDD[Array[Byte]]) extends
private[spark] class PairwiseRDD(prev: RDD[Array[Byte]]) extends
RDD[(Long, Array[Byte])](prev) {
override def getPartitions = prev.partitions
override def compute(split: Partition, context: TaskContext) =
Expand Down
20 changes: 20 additions & 0 deletions examples/src/main/python/streaming/network_wordcount.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import sys

from pyspark.streaming.context import StreamingContext
from pyspark.streaming.duration import *

if __name__ == "__main__":
if len(sys.argv) != 3:
print >> sys.stderr, "Usage: wordcount <hostname> <port>"
exit(-1)
ssc = StreamingContext(appName="PythonStreamingNetworkWordCount",
duration=Seconds(1))

lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
.reduceByKey(lambda a,b: a+b)
counts.pyprint()

ssc.start()
ssc.awaitTermination()
21 changes: 21 additions & 0 deletions examples/src/main/python/streaming/wordcount.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import sys

from pyspark.streaming.context import StreamingContext
from pyspark.streaming.duration import *

if __name__ == "__main__":
if len(sys.argv) != 2:
print >> sys.stderr, "Usage: wordcount <directory>"
exit(-1)

ssc = StreamingContext(appName="PythonStreamingWordCount",
duration=Seconds(1))

lines = ssc.textFileStream(sys.argv[1])
counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda x: (x, 1))\
.reduceByKey(lambda a, b: a+b)
counts.pyprint()

ssc.start()
ssc.awaitTermination()
3 changes: 3 additions & 0 deletions python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ def run(self):
java_import(gateway.jvm, "org.apache.spark.SparkConf")
java_import(gateway.jvm, "org.apache.spark.api.java.*")
java_import(gateway.jvm, "org.apache.spark.api.python.*")
java_import(gateway.jvm, "org.apache.spark.streaming.*")
java_import(gateway.jvm, "org.apache.spark.streaming.api.java.*")
java_import(gateway.jvm, "org.apache.spark.streaming.api.python.*")
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
java_import(gateway.jvm, "org.apache.spark.sql.SQLContext")
java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext")
Expand Down
Empty file.
186 changes: 186 additions & 0 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import sys
from signal import signal, SIGTERM, SIGINT
import atexit
import time

from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer
from pyspark.context import SparkContext
from pyspark.streaming.dstream import DStream
from pyspark.streaming.duration import Duration

from py4j.java_collections import ListConverter


class StreamingContext(object):
"""
Main entry point for Spark Streaming functionality. A StreamingContext represents the
connection to a Spark cluster, and can be used to create L{DStream}s and
broadcast variables on that cluster.
"""

def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
gateway=None, sparkContext=None, duration=None):
"""
Create a new StreamingContext. At least the master and app name and duration
should be set, either through the named parameters here or through C{conf}.

@param master: Cluster URL to connect to
(e.g. mesos://host:port, spark://host:port, local[4]).
@param appName: A name for your job, to display on the cluster web UI.
@param sparkHome: Location where Spark is installed on cluster nodes.
@param pyFiles: Collection of .zip or .py files to send to the cluster
and add to PYTHONPATH. These can be paths on the local file
system or HDFS, HTTP, HTTPS, or FTP URLs.
@param environment: A dictionary of environment variables to set on
worker nodes.
@param batchSize: The number of Python objects represented as a single
Java object. Set 1 to disable batching or -1 to use an
unlimited batch size.
@param serializer: The serializer for RDDs.
@param conf: A L{SparkConf} object setting Spark properties.
@param gateway: Use an existing gateway and JVM, otherwise a new JVM
will be instatiated.
@param sparkContext: L{SparkContext} object.
@param duration: A L{Duration} object for SparkStreaming.

"""

if not isinstance(duration, Duration):
raise TypeError("Input should be pyspark.streaming.duration.Duration object")

if sparkContext is None:
# Create the Python Sparkcontext
self._sc = SparkContext(master=master, appName=appName, sparkHome=sparkHome,
pyFiles=pyFiles, environment=environment, batchSize=batchSize,
serializer=serializer, conf=conf, gateway=gateway)
else:
self._sc = sparkContext

# Start py4j callback server.
# Callback sever is need only by SparkStreming; therefore the callback sever
# is started in StreamingContext.
SparkContext._gateway.restart_callback_server()
self._set_clean_up_handler()
self._jvm = self._sc._jvm
self._jssc = self._initialize_context(self._sc._jsc, duration._jduration)

# Initialize StremaingContext in function to allow subclass specific initialization
def _initialize_context(self, jspark_context, jduration):
return self._jvm.JavaStreamingContext(jspark_context, jduration)

def _set_clean_up_handler(self):
""" set clean up hander using atexit """

def clean_up_handler():
SparkContext._gateway.shutdown()

atexit.register(clean_up_handler)
# atext is not called when the program is killed by a signal not handled by
# Python.
for sig in (SIGINT, SIGTERM):
signal(sig, clean_up_handler)

@property
def sparkContext(self):
"""
Return SparkContext which is associated with this StreamingContext.
"""
return self._sc

def start(self):
"""
Start the execution of the streams.
"""
self._jssc.start()

def awaitTermination(self, timeout=None):
"""
Wait for the execution to stop.
@param timeout: time to wait in milliseconds
"""
if timeout is None:
self._jssc.awaitTermination()
else:
self._jssc.awaitTermination(timeout)

def remember(self, duration):
"""
Set each DStreams in this context to remember RDDs it generated in the last given duration.
DStreams remember RDDs only for a limited duration of time and releases them for garbage
collection. This method allows the developer to specify how to long to remember the RDDs (
if the developer wishes to query old data outside the DStream computation).
@param duration pyspark.streaming.duration.Duration object.
Minimum duration that each DStream should remember its RDDs
"""
if not isinstance(duration, Duration):
raise TypeError("Input should be pyspark.streaming.duration.Duration object")

self._jssc.remember(duration._jduration)

# TODO: add storageLevel
def socketTextStream(self, hostname, port):
"""
Create an input from TCP source hostname:port. Data is received using
a TCP socket and receive byte is interpreted as UTF8 encoded '\n' delimited
lines.
"""
return DStream(self._jssc.socketTextStream(hostname, port), self, UTF8Deserializer())

def textFileStream(self, directory):
"""
Create an input stream that monitors a Hadoop-compatible file system
for new files and reads them as text files. Files must be wrriten to the
monitored directory by "moving" them from another location within the same
file system. File names starting with . are ignored.
"""
return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer())

def stop(self, stopSparkContext=True, stopGraceFully=False):
"""
Stop the execution of the streams immediately (does not wait for all received data
to be processed).
"""
self._jssc.stop(stopSparkContext, stopGraceFully)
if stopSparkContext:
self._sc.stop()

# Shutdown only callback server and all py3j client is shutdowned
# clean up handler
SparkContext._gateway._shutdown_callback_server()

def _testInputStream(self, test_inputs, numSlices=None):
"""
This function is only for unittest.
It requires a list as input, and returns the i_th element at the i_th batch
under manual clock.
"""
test_rdds = list()
test_rdd_deserializers = list()
for test_input in test_inputs:
test_rdd = self._sc.parallelize(test_input, numSlices)
test_rdds.append(test_rdd._jrdd)
test_rdd_deserializers.append(test_rdd._jrdd_deserializer)
# All deserializers have to be the same.
# TODO: add deserializer validation
jtest_rdds = ListConverter().convert(test_rdds, SparkContext._gateway._gateway_client)
jinput_stream = self._jvm.PythonTestInputStream(self._jssc, jtest_rdds).asJavaDStream()

return DStream(jinput_stream, self, test_rdd_deserializers[0])
Loading