Skip to content

Commit 331cfcd

Browse files
committed
* Moved to package pyspark.sql.avro.functions
* Added avro artifact * Some refactoring * Formatting fixes
1 parent ea67dca commit 331cfcd

File tree

9 files changed

+232
-123
lines changed

9 files changed

+232
-123
lines changed

dev/sparktestsupport/modules.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,10 @@ def __hash__(self):
178178
],
179179
sbt_test_goals=[
180180
"avro/test",
181+
],
182+
python_test_goals=[
183+
# doctests
184+
"pyspark.sql.avro.functions"
181185
]
182186
)
183187

docs/sql-data-sources-avro.md

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -141,31 +141,31 @@ StreamingQuery query = output
141141
</div>
142142
<div data-lang="python" markdown="1">
143143
{% highlight python %}
144-
from pyspark.sql.functions import from_avro, to_avro
144+
from pyspark.sql.avro.functions import from_avro, to_avro
145145

146146
# `from_avro` requires Avro schema in JSON string format.
147147
jsonFormatSchema = open("examples/src/main/resources/user.avsc", "r").read()
148148

149-
df = spark
150-
.readStream
151-
.format("kafka")
152-
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
153-
.option("subscribe", "topic1")
149+
df = spark\
150+
.readStream\
151+
.format("kafka")\
152+
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
153+
.option("subscribe", "topic1")\
154154
.load()
155155

156156
# 1. Decode the Avro data into a struct;
157157
# 2. Filter by column `favorite_color`;
158158
# 3. Encode the column `name` in Avro format.
159-
output = df
160-
.select(from_avro("value", jsonFormatSchema).alias("user"))
161-
.where("user.favorite_color == \"red\"")
159+
output = df\
160+
.select(from_avro("value", jsonFormatSchema).alias("user"))\
161+
.where('user.favorite_color == "red"')\
162162
.select(to_avro("user.name").alias("value"))
163163

164-
query = output
165-
.writeStream
166-
.format("kafka")
167-
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
168-
.option("topic", "topic2")
164+
query = output\
165+
.writeStream\
166+
.format("kafka")\
167+
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
168+
.option("topic", "topic2")\
169169
.start()
170170

171171
{% endhighlight %}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
__all__ = ['functions']
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
"""
19+
A collections of builtin avro functions
20+
"""
21+
22+
23+
from pyspark import since, SparkContext
24+
from pyspark.sql.column import Column, _to_java_column
25+
from pyspark.util import _print_missing_jar
26+
27+
28+
@since(3.0)
29+
def from_avro(col, jsonFormatSchema, options={}):
30+
"""
31+
Converts a binary column of avro format into its corresponding catalyst value. The specified
32+
schema must match the read data, otherwise the behavior is undefined: it may fail or return
33+
arbitrary result.
34+
35+
Avro is built-in but external data source module since Spark 2.4. Please deploy the application
36+
as per the deployment section of "Apache Avro Data Source Guide".
37+
38+
:param data: the binary column.
39+
:param jsonFormatSchema: the avro schema in JSON string format.
40+
:param options: options to control how the Avro record is parsed.
41+
42+
>>> from pyspark.sql import Row
43+
>>> from pyspark.sql.avro.functions import from_avro, to_avro
44+
>>> data = [(1, Row(name='Alice', age=2))]
45+
>>> df = spark.createDataFrame(data, ("key", "value"))
46+
>>> avroDf = df.select(to_avro(df.value).alias("avro"))
47+
>>> avroDf.collect()
48+
[Row(avro=bytearray(b'\\x00\\x00\\x04\\x00\\nAlice'))]
49+
>>> jsonFormatSchema = '''{"type":"record","name":"topLevelRecord","fields":
50+
... [{"name":"avro","type":[{"type":"record","name":"value","namespace":"topLevelRecord",
51+
... "fields":[{"name":"age","type":["long","null"]},
52+
... {"name":"name","type":["string","null"]}]},"null"]}]}'''
53+
>>> avroDf.select(from_avro(avroDf.avro, jsonFormatSchema).alias("value")).collect()
54+
[Row(value=Row(avro=Row(age=2, name=u'Alice')))]
55+
"""
56+
57+
sc = SparkContext._active_spark_context
58+
try:
59+
jc = sc._jvm.org.apache.spark.sql.avro.functions.from_avro(
60+
_to_java_column(col), jsonFormatSchema, options)
61+
except TypeError as e:
62+
if str(e) == "'JavaPackage' object is not callable":
63+
_print_missing_jar("Avro", "avro", "avro", ssc.sparkContext.version)
64+
raise
65+
return Column(jc)
66+
67+
68+
@since(3.0)
69+
def to_avro(col):
70+
"""
71+
Converts a column into binary of avro format.
72+
73+
Avro is built-in but external data source module since Spark 2.4. Please deploy the application
74+
as per the deployment section of "Apache Avro Data Source Guide".
75+
76+
:param data: the data column.
77+
78+
>>> from pyspark.sql import Row
79+
>>> from pyspark.sql.avro.functions import to_avro
80+
>>> data = [(1, Row(name='Alice', age=2))]
81+
>>> df = spark.createDataFrame(data, ("key", "value"))
82+
>>> df.select(to_avro(df.value).alias("avro")).collect()
83+
[Row(avro=bytearray(b'\\x00\\x00\\x04\\x00\\nAlice'))]
84+
"""
85+
86+
sc = SparkContext._active_spark_context
87+
try:
88+
jc = sc._jvm.org.apache.spark.sql.avro.functions.to_avro(_to_java_column(col))
89+
except TypeError as e:
90+
if str(e) == "'JavaPackage' object is not callable":
91+
_print_missing_jar("Avro", "avro", "avro", ssc.sparkContext.version)
92+
raise
93+
return Column(jc)
94+
95+
96+
def _test():
97+
import os
98+
import sys
99+
from pyspark.testing.utils import search_jar
100+
avro_jar = search_jar("external/avro", "spark-avro")
101+
if avro_jar is None:
102+
print(
103+
"Skipping all Avro Python tests as the optional Avro project was "
104+
"not compiled into a JAR. To run these tests, "
105+
"you need to build Spark with 'build/sbt -Pavro package' or "
106+
"'build/mvn -Pavro package' before running this test.")
107+
sys.exit(0)
108+
else:
109+
existing_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
110+
jars_args = "--jars %s" % avro_jar
111+
os.environ["PYSPARK_SUBMIT_ARGS"] = " ".join([jars_args, existing_args])
112+
113+
import doctest
114+
from pyspark.sql import Row, SparkSession
115+
import pyspark.sql.avro.functions
116+
globs = pyspark.sql.avro.functions.__dict__.copy()
117+
spark = SparkSession.builder\
118+
.master("local[4]")\
119+
.appName("sql.avro.functions tests")\
120+
.getOrCreate()
121+
sc = spark.sparkContext
122+
globs['sc'] = sc
123+
globs['spark'] = spark
124+
globs['df'] = spark.createDataFrame([Row(name='Alice', age=2), Row(name='Bob', age=5)])
125+
(failure_count, test_count) = doctest.testmod(
126+
pyspark.sql.avro.functions, globs=globs,
127+
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE)
128+
spark.stop()
129+
if failure_count:
130+
sys.exit(-1)
131+
132+
133+
if __name__ == "__main__":
134+
_test()

python/pyspark/sql/functions.py

Lines changed: 0 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -2402,64 +2402,6 @@ def to_csv(col, options={}):
24022402
return Column(jc)
24032403

24042404

2405-
@since(3.0)
2406-
def from_avro(col, jsonFormatSchema, options={}):
2407-
"""
2408-
Converts a binary column of avro format into its corresponding catalyst value. The specified
2409-
schema must match the read data, otherwise the behavior is undefined: it may fail or return
2410-
arbitrary result.
2411-
2412-
Avro is built-in but external data source module since Spark 2.4. Please deploy the application
2413-
as per the deployment section of "Apache Avro Data Source Guide".
2414-
2415-
:param data: the binary column.
2416-
:param jsonFormatSchema: the avro schema in JSON string format.
2417-
:param options: options to control how the Avro record is parsed.
2418-
2419-
>>> from pyspark.sql import Row
2420-
>>> from pyspark.sql.functions import from_avro, to_avro
2421-
>>> data = [(1, Row(name='Alice', age=2))]
2422-
>>> df = spark.createDataFrame(data, ("key", "value"))
2423-
>>> avroDf = df.select(to_avro(df.value).alias("avro"))
2424-
>>> avroDf.collect()
2425-
[Row(avro=bytearray(b'\\x00\\x00\\x04\\x00\\nAlice'))]
2426-
>>> jsonFormatSchema = '''{"type":"record","name":"topLevelRecord","fields":
2427-
... [{"name":"avro","type":[{"type":"record","name":"value","namespace":"topLevelRecord",
2428-
... "fields":[{"name":"age","type":["long","null"]},
2429-
... {"name":"name","type":["string","null"]}]},"null"]}]}'''
2430-
>>> avroDf.select(from_avro(avroDf.avro, jsonFormatSchema).alias("value")).collect()
2431-
[Row(value=Row(avro=Row(age=2, name=u'Alice')))]
2432-
"""
2433-
2434-
sc = SparkContext._active_spark_context
2435-
jc = sc._jvm.org.apache.spark.sql.avro.functions.from_avro(_to_java_column(col),
2436-
jsonFormatSchema, options)
2437-
return Column(jc)
2438-
2439-
2440-
@since(3.0)
2441-
def to_avro(col):
2442-
"""
2443-
Converts a column into binary of avro format.
2444-
2445-
Avro is built-in but external data source module since Spark 2.4. Please deploy the application
2446-
as per the deployment section of "Apache Avro Data Source Guide".
2447-
2448-
:param data: the data column.
2449-
2450-
>>> from pyspark.sql import Row
2451-
>>> from pyspark.sql.functions import to_avro
2452-
>>> data = [(1, Row(name='Alice', age=2))]
2453-
>>> df = spark.createDataFrame(data, ("key", "value"))
2454-
>>> df.select(to_avro(df.value).alias("avro")).collect()
2455-
[Row(avro=bytearray(b'\\x00\\x00\\x04\\x00\\nAlice'))]
2456-
"""
2457-
2458-
sc = SparkContext._active_spark_context
2459-
jc = sc._jvm.org.apache.spark.sql.avro.functions.to_avro(_to_java_column(col))
2460-
return Column(jc)
2461-
2462-
24632405
@since(1.5)
24642406
def size(col):
24652407
"""

python/pyspark/streaming/kinesis.py

Lines changed: 7 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
from pyspark.serializers import NoOpSerializer
1919
from pyspark.storagelevel import StorageLevel
2020
from pyspark.streaming import DStream
21+
from pyspark.util import _print_missing_jar
22+
2123

2224
__all__ = ['KinesisUtils', 'InitialPositionInStream', 'utf8_decoder']
2325

@@ -82,7 +84,11 @@ def createStream(ssc, kinesisAppName, streamName, endpointUrl, regionName,
8284
helper = ssc._jvm.org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper()
8385
except TypeError as e:
8486
if str(e) == "'JavaPackage' object is not callable":
85-
KinesisUtils._printErrorMsg(ssc.sparkContext)
87+
_print_missing_jar(
88+
"Streaming's Kinesis",
89+
"streaming-kinesis-asl",
90+
"streaming-kinesis-asl-assembly",
91+
ssc.sparkContext.version)
8692
raise
8793
jstream = helper.createStream(ssc._jssc, kinesisAppName, streamName, endpointUrl,
8894
regionName, initialPositionInStream, jduration, jlevel,
@@ -91,28 +97,6 @@ def createStream(ssc, kinesisAppName, streamName, endpointUrl, regionName,
9197
stream = DStream(jstream, ssc, NoOpSerializer())
9298
return stream.map(lambda v: decoder(v))
9399

94-
@staticmethod
95-
def _printErrorMsg(sc):
96-
print("""
97-
________________________________________________________________________________________________
98-
99-
Spark Streaming's Kinesis libraries not found in class path. Try one of the following.
100-
101-
1. Include the Kinesis library and its dependencies with in the
102-
spark-submit command as
103-
104-
$ bin/spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl:%s ...
105-
106-
2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
107-
Group Id = org.apache.spark, Artifact Id = spark-streaming-kinesis-asl-assembly, Version = %s.
108-
Then, include the jar in the spark-submit command as
109-
110-
$ bin/spark-submit --jars <spark-streaming-kinesis-asl-assembly.jar> ...
111-
112-
________________________________________________________________________________________________
113-
114-
""" % (sc.version, sc.version))
115-
116100

117101
class InitialPositionInStream(object):
118102
LATEST, TRIM_HORIZON = (0, 1)

python/pyspark/testing/streamingutils.py

Lines changed: 3 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -14,40 +14,14 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616
#
17-
import glob
1817
import os
1918
import tempfile
2019
import time
2120
import unittest
2221

2322
from pyspark import SparkConf, SparkContext, RDD
2423
from pyspark.streaming import StreamingContext
25-
26-
27-
def search_kinesis_asl_assembly_jar():
28-
kinesis_asl_assembly_dir = os.path.join(
29-
os.environ["SPARK_HOME"], "external/kinesis-asl-assembly")
30-
31-
# We should ignore the following jars
32-
ignored_jar_suffixes = ("javadoc.jar", "sources.jar", "test-sources.jar", "tests.jar")
33-
34-
# Search jar in the project dir using the jar name_prefix for both sbt build and maven
35-
# build because the artifact jars are in different directories.
36-
name_prefix = "spark-streaming-kinesis-asl-assembly"
37-
sbt_build = glob.glob(os.path.join(
38-
kinesis_asl_assembly_dir, "target/scala-*/%s-*.jar" % name_prefix))
39-
maven_build = glob.glob(os.path.join(
40-
kinesis_asl_assembly_dir, "target/%s_*.jar" % name_prefix))
41-
jar_paths = sbt_build + maven_build
42-
jars = [jar for jar in jar_paths if not jar.endswith(ignored_jar_suffixes)]
43-
44-
if not jars:
45-
return None
46-
elif len(jars) > 1:
47-
raise Exception(("Found multiple Spark Streaming Kinesis ASL assembly JARs: %s; please "
48-
"remove all but one") % (", ".join(jars)))
49-
else:
50-
return jars[0]
24+
from pyspark.testing.utils import search_jar
5125

5226

5327
# Must be same as the variable and condition defined in KinesisTestUtils.scala and modules.py
@@ -59,7 +33,8 @@ def search_kinesis_asl_assembly_jar():
5933
"Skipping all Kinesis Python tests as environmental variable 'ENABLE_KINESIS_TESTS' "
6034
"was not set.")
6135
else:
62-
kinesis_asl_assembly_jar = search_kinesis_asl_assembly_jar()
36+
kinesis_asl_assembly_jar = search_jar("external/kinesis-asl-assembly",
37+
"spark-streaming-kinesis-asl-assembly")
6338
if kinesis_asl_assembly_jar is None:
6439
kinesis_requirement_message = (
6540
"Skipping all Kinesis Python tests as the optional Kinesis project was "

0 commit comments

Comments
 (0)