@@ -100,7 +100,7 @@ val sqlContext = new org.apache.spark.sql.SQLContext(sc)
100
100
val df = sqlContext.jsonFile("examples/src/main/resources/people.json")
101
101
102
102
// Displays the content of the DataFrame to stdout
103
- df.show()
103
+ df.show()
104
104
{% endhighlight %}
105
105
106
106
</div >
@@ -151,10 +151,10 @@ val df = sqlContext.jsonFile("examples/src/main/resources/people.json")
151
151
152
152
// Show the content of the DataFrame
153
153
df.show()
154
- // age name
154
+ // age name
155
155
// null Michael
156
- // 30 Andy
157
- // 19 Justin
156
+ // 30 Andy
157
+ // 19 Justin
158
158
159
159
// Print the schema in a tree format
160
160
df.printSchema()
@@ -164,17 +164,17 @@ df.printSchema()
164
164
165
165
// Select only the "name" column
166
166
df.select("name").show()
167
- // name
167
+ // name
168
168
// Michael
169
- // Andy
170
- // Justin
169
+ // Andy
170
+ // Justin
171
171
172
172
// Select everybody, but increment the age by 1
173
173
df.select("name", df("age") + 1).show()
174
174
// name (age + 1)
175
- // Michael null
176
- // Andy 31
177
- // Justin 20
175
+ // Michael null
176
+ // Andy 31
177
+ // Justin 20
178
178
179
179
// Select people older than 21
180
180
df.filter(df("name") > 21).show()
@@ -201,10 +201,10 @@ DataFrame df = sqlContext.jsonFile("examples/src/main/resources/people.json");
201
201
202
202
// Show the content of the DataFrame
203
203
df.show();
204
- // age name
204
+ // age name
205
205
// null Michael
206
- // 30 Andy
207
- // 19 Justin
206
+ // 30 Andy
207
+ // 19 Justin
208
208
209
209
// Print the schema in a tree format
210
210
df.printSchema();
@@ -214,17 +214,17 @@ df.printSchema();
214
214
215
215
// Select only the "name" column
216
216
df.select("name").show();
217
- // name
217
+ // name
218
218
// Michael
219
- // Andy
220
- // Justin
219
+ // Andy
220
+ // Justin
221
221
222
222
// Select everybody, but increment the age by 1
223
223
df.select("name", df.col("age").plus(1)).show();
224
224
// name (age + 1)
225
- // Michael null
226
- // Andy 31
227
- // Justin 20
225
+ // Michael null
226
+ // Andy 31
227
+ // Justin 20
228
228
229
229
// Select people older than 21
230
230
df.filter(df("name") > 21).show();
@@ -251,10 +251,10 @@ df = sqlContext.jsonFile("examples/src/main/resources/people.json")
251
251
252
252
# Show the content of the DataFrame
253
253
df.show()
254
- ## age name
254
+ ## age name
255
255
## null Michael
256
- ## 30 Andy
257
- ## 19 Justin
256
+ ## 30 Andy
257
+ ## 19 Justin
258
258
259
259
# Print the schema in a tree format
260
260
df.printSchema()
@@ -264,17 +264,17 @@ df.printSchema()
264
264
265
265
# Select only the "name" column
266
266
df.select("name").show()
267
- ## name
267
+ ## name
268
268
## Michael
269
- ## Andy
270
- ## Justin
269
+ ## Andy
270
+ ## Justin
271
271
272
272
# Select everybody, but increment the age by 1
273
273
df.select("name", df.age + 1).show()
274
274
## name (age + 1)
275
- ## Michael null
276
- ## Andy 31
277
- ## Justin 20
275
+ ## Michael null
276
+ ## Andy 31
277
+ ## Justin 20
278
278
279
279
# Select people older than 21
280
280
df.filter(df.name > 21).show()
@@ -907,6 +907,129 @@ SELECT * FROM parquetTable
907
907
908
908
</div >
909
909
910
+ ### Partition discovery
911
+
912
+ Table partitioning is a common optimization approach used in systems like Hive. In a partitioned
913
+ table, data are usually stored in different directories, with partitioning column values encoded in
914
+ the path of each partition directory. The Parquet data source is now able to discover and infer
915
+ partitioning information automatically. For exmaple, we can store all our previously used
916
+ population data into a partitioned table using the following directory structure, with two extra
917
+ columns, ` sex ` and ` country ` as partitioning columns:
918
+
919
+ {% highlight text %}
920
+
921
+ path
922
+ └── to
923
+ └── table
924
+ ├── sex=0
925
+ │ ├── ...
926
+ │ │
927
+ │ ├── country=US
928
+ │ │ └── data.parquet
929
+ │ ├── country=CN
930
+ │ │ └── data.parquet
931
+ │ └── ...
932
+ └── sex=1
933
+ ├── ...
934
+ │
935
+ ├── country=US
936
+ │ └── data.parquet
937
+ ├── country=CN
938
+ │ └── data.parquet
939
+ └── ...
940
+
941
+ {% endhighlight %}
942
+
943
+ By passing ` path/to/table ` to either ` SQLContext.parquetFile ` or ` SQLContext.load ` , Spark SQL will
944
+ automatically extract the partitioning information from the paths. Now the schema of the returned
945
+ DataFrame becomes:
946
+
947
+ {% highlight text %}
948
+
949
+ root
950
+ |-- name: string (nullable = true)
951
+ |-- age: long (nullable = true)
952
+ |-- sex: string (nullable = true)
953
+ |-- country: string (nullable = true)
954
+
955
+ {% endhighlight %}
956
+
957
+ Notice that the data types of the partitioning columns are automatically inferred. Currently,
958
+ numeric data types and string type are supported.
959
+
960
+ ### Schema merging
961
+
962
+ Like ProtocolBuffer, Avro, and Thrift, Parquet also supports schema evolution. Users can start with
963
+ a simple schema, and gradually add more columns to the schema as needed. In this way, users may end
964
+ up with multiple Parquet files with different but mutually compatible schemas. The Parquet data
965
+ source is now able to automatically detect this case and merge schemas of all these files.
966
+
967
+ <div class =" codetabs " >
968
+
969
+ <div data-lang =" scala " markdown =" 1 " >
970
+
971
+ {% highlight scala %}
972
+ // sqlContext from the previous example is used in this example.
973
+ // This is used to implicitly convert an RDD to a DataFrame.
974
+ import sqlContext.implicits._
975
+
976
+ // Create a simple DataFrame, stored into a partition directory
977
+ val df1 = sparkContext.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
978
+ df1.saveAsParquetFile("data/test_table/key=1")
979
+
980
+ // Create another DataFrame in a new partition directory,
981
+ // adding a new column and dropping an existing column
982
+ val df2 = sparkContext.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
983
+ df2.saveAsParquetFile("data/test_table/key=2")
984
+
985
+ // Read the partitioned table
986
+ val df3 = sqlContext.parquetFile("data/test_table")
987
+ df3.printSchema()
988
+
989
+ // The final schema consists of all 3 columns in the Parquet files together
990
+ // with the partiioning column appeared in the partition directory paths.
991
+ // root
992
+ // |-- single: int (nullable = true)
993
+ // |-- double: int (nullable = true)
994
+ // |-- triple: int (nullable = true)
995
+ // |-- key : int (nullable = true)
996
+ {% endhighlight %}
997
+
998
+ </div >
999
+
1000
+ <div data-lang =" python " markdown =" 1 " >
1001
+
1002
+ {% highlight python %}
1003
+ # sqlContext from the previous example is used in this example.
1004
+
1005
+ # Create a simple DataFrame, stored into a partition directory
1006
+ df1 = sqlContext.createDataFrame(sc.parallelize(range(1, 6))\
1007
+ .map(lambda i: Row(single=i, double=i * 2)))
1008
+ df1.save("data/test_table/key=1", "parquet")
1009
+
1010
+ # Create another DataFrame in a new partition directory,
1011
+ # adding a new column and dropping an existing column
1012
+ df2 = sqlContext.createDataFrame(sc.parallelize(range(6, 11))
1013
+ .map(lambda i: Row(single=i, triple=i * 3)))
1014
+ df2.save("data/test_table/key=2", "parquet")
1015
+
1016
+ # Read the partitioned table
1017
+ df3 = sqlContext.parquetFile("data/test_table")
1018
+ df3.printSchema()
1019
+
1020
+ # The final schema consists of all 3 columns in the Parquet files together
1021
+ # with the partiioning column appeared in the partition directory paths.
1022
+ # root
1023
+ # |-- single: int (nullable = true)
1024
+ # |-- double: int (nullable = true)
1025
+ # |-- triple: int (nullable = true)
1026
+ # |-- key : int (nullable = true)
1027
+ {% endhighlight %}
1028
+
1029
+ </div >
1030
+
1031
+ </div >
1032
+
910
1033
### Configuration
911
1034
912
1035
Configuration of Parquet can be done using the ` setConf ` method on ` SQLContext ` or by running
@@ -1429,10 +1552,10 @@ Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`.
1429
1552
1430
1553
You may also use the beeline script that comes with Hive.
1431
1554
1432
- Thrift JDBC server also supports sending thrift RPC messages over HTTP transport.
1433
- Use the following setting to enable HTTP mode as system property or in ` hive-site.xml ` file in ` conf/ ` :
1555
+ Thrift JDBC server also supports sending thrift RPC messages over HTTP transport.
1556
+ Use the following setting to enable HTTP mode as system property or in ` hive-site.xml ` file in ` conf/ ` :
1434
1557
1435
- hive.server2.transport.mode - Set this to value: http
1558
+ hive.server2.transport.mode - Set this to value: http
1436
1559
hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001
1437
1560
hive.server2.http.endpoint - HTTP endpoint; default is cliservice
1438
1561
0 commit comments