Skip to content

Commit 454981d

Browse files
committed
initial commit for pySparkStreaming
1 parent 78d4220 commit 454981d

File tree

16 files changed

+1043
-5
lines changed

16 files changed

+1043
-5
lines changed

core/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,11 @@
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
2323
<artifactId>spark-parent</artifactId>
24+
<<<<<<< HEAD
2425
<version>1.2.0-SNAPSHOT</version>
26+
=======
27+
<version>1.0.0</version>
28+
>>>>>>> initial commit for pySparkStreaming
2529
<relativePath>../pom.xml</relativePath>
2630
</parent>
2731

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ private class PythonException(msg: String, cause: Exception) extends RuntimeExce
288288
* Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python.
289289
* This is used by PySpark's shuffle operations.
290290
*/
291-
private class PairwiseRDD(prev: RDD[Array[Byte]]) extends
291+
private[spark] class PairwiseRDD(prev: RDD[Array[Byte]]) extends
292292
RDD[(Long, Array[Byte])](prev) {
293293
override def getPartitions = prev.partitions
294294
override def compute(split: Partition, context: TaskContext) =

core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ object PythonRunner {
5757
val builder = new ProcessBuilder(Seq(pythonExec, "-u", formattedPythonFile) ++ otherArgs)
5858
val env = builder.environment()
5959
env.put("PYTHONPATH", pythonPath)
60+
env.put("PYSPARK_PYTHON", pythonExec)
6061
env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
6162
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
6263
val process = builder.start()
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import sys
2+
from operator import add
3+
4+
from pyspark.streaming.context import StreamingContext
5+
from pyspark.streaming.duration import *
6+
7+
if __name__ == "__main__":
8+
if len(sys.argv) != 2:
9+
print >> sys.stderr, "Usage: wordcount <directory>"
10+
exit(-1)
11+
ssc = StreamingContext(appName="PythonStreamingWordCount", duration=Seconds(1))
12+
13+
lines = ssc.textFileStream(sys.argv[1])
14+
fm_lines = lines.flatMap(lambda x: x.split(" "))
15+
filtered_lines = fm_lines.filter(lambda line: "Spark" in line)
16+
mapped_lines = fm_lines.map(lambda x: (x, 1))
17+
18+
fm_lines.pyprint()
19+
filtered_lines.pyprint()
20+
mapped_lines.pyprint()
21+
ssc.start()
22+
ssc.awaitTermination()

python/pyspark/java_gateway.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,9 @@ def run(self):
108108
java_import(gateway.jvm, "org.apache.spark.SparkConf")
109109
java_import(gateway.jvm, "org.apache.spark.api.java.*")
110110
java_import(gateway.jvm, "org.apache.spark.api.python.*")
111+
java_import(gateway.jvm, "org.apache.spark.streaming.*")
112+
java_import(gateway.jvm, "org.apache.spark.streaming.api.java.*")
113+
java_import(gateway.jvm, "org.apache.spark.streaming.api.python.*")
111114
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
112115
java_import(gateway.jvm, "org.apache.spark.sql.SQLContext")
113116
java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext")

python/pyspark/streaming/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
__author__ = 'ktakagiw'

python/pyspark/streaming/context.py

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
__author__ = 'ktakagiw'
2+
3+
4+
#
5+
# Licensed to the Apache Software Foundation (ASF) under one or more
6+
# contributor license agreements. See the NOTICE file distributed with
7+
# this work for additional information regarding copyright ownership.
8+
# The ASF licenses this file to You under the Apache License, Version 2.0
9+
# (the "License"); you may not use this file except in compliance with
10+
# the License. You may obtain a copy of the License at
11+
#
12+
# http://www.apache.org/licenses/LICENSE-2.0
13+
#
14+
# Unless required by applicable law or agreed to in writing, software
15+
# distributed under the License is distributed on an "AS IS" BASIS,
16+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
# See the License for the specific language governing permissions and
18+
# limitations under the License.
19+
#
20+
21+
import os
22+
import shutil
23+
import sys
24+
from threading import Lock
25+
from tempfile import NamedTemporaryFile
26+
27+
from pyspark import accumulators
28+
from pyspark.accumulators import Accumulator
29+
from pyspark.broadcast import Broadcast
30+
from pyspark.conf import SparkConf
31+
from pyspark.files import SparkFiles
32+
from pyspark.java_gateway import launch_gateway
33+
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer
34+
from pyspark.storagelevel import StorageLevel
35+
from pyspark.rdd import RDD
36+
from pyspark.context import SparkContext
37+
38+
from py4j.java_collections import ListConverter
39+
40+
from pyspark.streaming.dstream import DStream
41+
42+
class StreamingContext(object):
43+
"""
44+
Main entry point for Spark functionality. A StreamingContext represents the
45+
connection to a Spark cluster, and can be used to create L{RDD}s and
46+
broadcast variables on that cluster.
47+
"""
48+
49+
def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
50+
environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
51+
gateway=None, duration=None):
52+
"""
53+
Create a new StreamingContext. At least the master and app name and duration
54+
should be set, either through the named parameters here or through C{conf}.
55+
56+
@param master: Cluster URL to connect to
57+
(e.g. mesos://host:port, spark://host:port, local[4]).
58+
@param appName: A name for your job, to display on the cluster web UI.
59+
@param sparkHome: Location where Spark is installed on cluster nodes.
60+
@param pyFiles: Collection of .zip or .py files to send to the cluster
61+
and add to PYTHONPATH. These can be paths on the local file
62+
system or HDFS, HTTP, HTTPS, or FTP URLs.
63+
@param environment: A dictionary of environment variables to set on
64+
worker nodes.
65+
@param batchSize: The number of Python objects represented as a single
66+
Java object. Set 1 to disable batching or -1 to use an
67+
unlimited batch size.
68+
@param serializer: The serializer for RDDs.
69+
@param conf: A L{SparkConf} object setting Spark properties.
70+
@param gateway: Use an existing gateway and JVM, otherwise a new JVM
71+
will be instatiated.
72+
@param duration: A L{Duration} Duration for SparkStreaming
73+
74+
"""
75+
# Create the Python Sparkcontext
76+
self._sc = SparkContext(master=master, appName=appName, sparkHome=sparkHome,
77+
pyFiles=pyFiles, environment=environment, batchSize=batchSize,
78+
serializer=serializer, conf=conf, gateway=gateway)
79+
self._jvm = self._sc._jvm
80+
self._jssc = self._initialize_context(self._sc._jsc, duration._jduration)
81+
82+
# Initialize StremaingContext in function to allow subclass specific initialization
83+
def _initialize_context(self, jspark_context, jduration):
84+
return self._jvm.JavaStreamingContext(jspark_context, jduration)
85+
86+
def actorStream(self, props, name, storageLevel, supervisorStrategy):
87+
raise NotImplementedError
88+
89+
def addStreamingListener(self, streamingListener):
90+
raise NotImplementedError
91+
92+
def awaitTermination(self, timeout=None):
93+
if timeout:
94+
self._jssc.awaitTermination(timeout)
95+
else:
96+
self._jssc.awaitTermination()
97+
98+
def checkpoint(self, directory):
99+
raise NotImplementedError
100+
101+
def fileStream(self, directory, filter=None, newFilesOnly=None):
102+
raise NotImplementedError
103+
104+
def networkStream(self, receiver):
105+
raise NotImplementedError
106+
107+
def queueStream(self, queue, oneAtATime=True, defaultRDD=None):
108+
raise NotImplementedError
109+
110+
def rawSocketStream(self, hostname, port, storagelevel):
111+
raise NotImplementedError
112+
113+
def remember(self, duration):
114+
raise NotImplementedError
115+
116+
def socketStream(hostname, port, converter,storageLevel):
117+
raise NotImplementedError
118+
119+
def start(self):
120+
self._jssc.start()
121+
122+
def stop(self, stopSparkContext=True):
123+
raise NotImplementedError
124+
125+
def textFileStream(self, directory):
126+
return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer())
127+
128+
def transform(self, seq):
129+
raise NotImplementedError
130+
131+
def union(self, seq):
132+
raise NotImplementedError
133+

0 commit comments

Comments
 (0)