Skip to content

Commit 8667c28

Browse files
author
pgandhi
committed
Merge branch 'master' of https://github.com/pgandhi999/spark into SPARK-25250
[SPARK-25250] : Upmerging with master to fix unit tests
2 parents 5ad6efd + 4506dad commit 8667c28

File tree

9 files changed

+86
-5
lines changed

9 files changed

+86
-5
lines changed

core/src/test/scala/org/apache/spark/ui/PagedTableSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ class PagedTableSuite extends SparkFunSuite {
7373
override def goButtonFormPath: String = ""
7474
}
7575

76-
assert(pagedTable.pageNavigation(1, 10, 1) === Nil)
76+
assert((pagedTable.pageNavigation(1, 10, 1).head \\ "li").map(_.text.trim) === Seq("1"))
7777
assert(
7878
(pagedTable.pageNavigation(1, 10, 2).head \\ "li").map(_.text.trim) === Seq("1", "2", ">"))
7979
assert(

docs/sql-data-sources-load-save-functions.md

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,50 @@ To load a CSV file you can use:
8282
</div>
8383
</div>
8484

85+
The extra options are also used during write operation.
86+
For example, you can control bloom filters and dictionary encodings for ORC data sources.
87+
The following ORC example will create bloom filter and use dictionary encoding only for `favorite_color`.
88+
For Parquet, there exists `parquet.enable.dictionary`, too.
89+
To find more detailed information about the extra ORC/Parquet options,
90+
visit the official Apache ORC/Parquet websites.
91+
92+
<div class="codetabs">
93+
94+
<div data-lang="scala" markdown="1">
95+
{% include_example manual_save_options_orc scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
96+
</div>
97+
98+
<div data-lang="java" markdown="1">
99+
{% include_example manual_save_options_orc java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
100+
</div>
101+
102+
<div data-lang="python" markdown="1">
103+
{% include_example manual_save_options_orc python/sql/datasource.py %}
104+
</div>
105+
106+
<div data-lang="r" markdown="1">
107+
{% include_example manual_save_options_orc r/RSparkSQLExample.R %}
108+
</div>
109+
110+
<div data-lang="sql" markdown="1">
111+
112+
{% highlight sql %}
113+
CREATE TABLE users_with_options (
114+
name STRING,
115+
favorite_color STRING,
116+
favorite_numbers array<integer>
117+
) USING ORC
118+
OPTIONS (
119+
orc.bloom.filter.columns 'favorite_color',
120+
orc.dictionary.key.threshold '1.0',
121+
orc.column.encoding.direct 'name'
122+
)
123+
{% endhighlight %}
124+
125+
</div>
126+
127+
</div>
128+
85129
### Run SQL on files directly
86130

87131
Instead of using read API to load a file into DataFrame and query it, you can also query that

examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,13 @@ private static void runBasicDataSourceExample(SparkSession spark) {
123123
.option("header", "true")
124124
.load("examples/src/main/resources/people.csv");
125125
// $example off:manual_load_options_csv$
126+
// $example on:manual_save_options_orc$
127+
usersDF.write().format("orc")
128+
.option("orc.bloom.filter.columns", "favorite_color")
129+
.option("orc.dictionary.key.threshold", "1.0")
130+
.option("orc.column.encoding.direct", "name")
131+
.save("users_with_options.orc");
132+
// $example off:manual_save_options_orc$
126133
// $example on:direct_sql$
127134
Dataset<Row> sqlDF =
128135
spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");

examples/src/main/python/sql/datasource.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,15 @@ def basic_datasource_example(spark):
5757
format="csv", sep=":", inferSchema="true", header="true")
5858
# $example off:manual_load_options_csv$
5959

60+
# $example on:manual_save_options_orc$
61+
df = spark.read.orc("examples/src/main/resources/users.orc")
62+
(df.write.format("orc")
63+
.option("orc.bloom.filter.columns", "favorite_color")
64+
.option("orc.dictionary.key.threshold", "1.0")
65+
.option("orc.column.encoding.direct", "name")
66+
.save("users_with_options.orc"))
67+
# $example off:manual_save_options_orc$
68+
6069
# $example on:write_sorting_and_bucketing$
6170
df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
6271
# $example off:write_sorting_and_bucketing$

examples/src/main/r/RSparkSQLExample.R

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,14 @@ write.df(namesAndAges, "namesAndAges.parquet", "parquet")
114114

115115

116116
# $example on:manual_load_options_csv$
117-
df <- read.df("examples/src/main/resources/people.csv", "csv", sep=";", inferSchema=T, header=T)
117+
df <- read.df("examples/src/main/resources/people.csv", "csv", sep = ";", inferSchema = TRUE, header = TRUE)
118118
namesAndAges <- select(df, "name", "age")
119119
# $example off:manual_load_options_csv$
120120

121+
# $example on:manual_save_options_orc$
122+
df <- read.df("examples/src/main/resources/users.orc", "orc")
123+
write.orc(df, "users_with_options.orc", orc.bloom.filter.columns = "favorite_color", orc.dictionary.key.threshold = 1.0, orc.column.encoding.direct = "name")
124+
# $example off:manual_save_options_orc$
121125

122126
# $example on:direct_sql$
123127
df <- sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

examples/src/main/resources/users.orc

547 Bytes
Binary file not shown.

examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,13 @@ object SQLDataSourceExample {
5656
.option("header", "true")
5757
.load("examples/src/main/resources/people.csv")
5858
// $example off:manual_load_options_csv$
59+
// $example on:manual_save_options_orc$
60+
usersDF.write.format("orc")
61+
.option("orc.bloom.filter.columns", "favorite_color")
62+
.option("orc.dictionary.key.threshold", "1.0")
63+
.option("orc.column.encoding.direct", "name")
64+
.save("users_with_options.orc")
65+
// $example off:manual_save_options_orc$
5966

6067
// $example on:direct_sql$
6168
val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ object RowEncoder {
171171

172172
if (inputObject.nullable) {
173173
If(IsNull(inputObject),
174-
Literal.create(null, inputType),
174+
Literal.create(null, nonNullOutput.dataType),
175175
nonNullOutput)
176176
} else {
177177
nonNullOutput
@@ -187,7 +187,9 @@ object RowEncoder {
187187
val convertedField = if (field.nullable) {
188188
If(
189189
Invoke(inputObject, "isNullAt", BooleanType, Literal(index) :: Nil),
190-
Literal.create(null, field.dataType),
190+
// Because we strip UDTs, `field.dataType` can be different from `fieldValue.dataType`.
191+
// We should use `fieldValue.dataType` here.
192+
Literal.create(null, fieldValue.dataType),
191193
fieldValue
192194
)
193195
} else {
@@ -198,7 +200,7 @@ object RowEncoder {
198200

199201
if (inputObject.nullable) {
200202
If(IsNull(inputObject),
201-
Literal.create(null, inputType),
203+
Literal.create(null, nonNullOutput.dataType),
202204
nonNullOutput)
203205
} else {
204206
nonNullOutput

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,14 @@ class RowEncoderSuite extends CodegenInterpretedPlanTest {
273273
assert(e4.getMessage.contains("java.lang.String is not a valid external type"))
274274
}
275275

276+
test("SPARK-25791: Datatype of serializers should be accessible") {
277+
val udtSQLType = new StructType().add("a", IntegerType)
278+
val pythonUDT = new PythonUserDefinedType(udtSQLType, "pyUDT", "serializedPyClass")
279+
val schema = new StructType().add("pythonUDT", pythonUDT, true)
280+
val encoder = RowEncoder(schema)
281+
assert(encoder.serializer(0).dataType == pythonUDT.sqlType)
282+
}
283+
276284
for {
277285
elementType <- Seq(IntegerType, StringType)
278286
containsNull <- Seq(true, false)

0 commit comments

Comments
 (0)