Skip to content

Commit dfd4e2e

Browse files
committed
feat: 复习spark sql
1 parent 31799d4 commit dfd4e2e

File tree

1 file changed

+44
-8
lines changed

1 file changed

+44
-8
lines changed

study-spark/spark-sql/src/main/scala/com/spark/sql/SparkHive.scala

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,21 @@
11
package com.spark.sql
22

3+
import org.apache.spark.sql.{DataFrame, SparkSession}
34
import org.apache.spark.{SparkConf, SparkContext}
45

56
/**
6-
* @fileName: SparkHive.java
7-
* @description: SparkHive.java类说明
8-
* @author: by echo huang
9-
* @date: 2020-06-29 10:27
10-
*/
7+
* @fileName: SparkHive.java
8+
* @description: SparkHive.java类说明
9+
* @author: by echo huang
10+
* @date: 2020-06-29 10:27
11+
*/
1112
object SparkHive extends App {
1213
override def main(args: Array[String]): Unit = {
13-
testYarn()
14+
// testYarn()
1415
// val sparkBuilder = SparkSession.builder()
1516
// .master("local[*]")
1617
// .appName("hive")
17-
// .config("spark.driver.memory", "4g")
18+
// .config"spark.driver.memory", "4g")
1819
// .config("spark.num.executors", "4")
1920
// .config("spark.executor.memory", "2g")
2021
// .config("spark.executor.cores", "4")
@@ -26,6 +27,41 @@ object SparkHive extends App {
2627
// val frame: DataFrame = spark.sql("select * from forchange_prod.user_orders")
2728
// frame.show(20)
2829
// spark.close()
30+
val spark: SparkSession = SparkSession.builder().master("local[*]")
31+
.appName("hive")
32+
.config("spark.shuffle.manager", "sort")
33+
.config("hive.exec.dynamic.partition", "true")
34+
.config("hive.exec.dynamic.partition.mode", "nonstrict")
35+
.config("hive.exec.max.dynamic.partitions", 2048)
36+
.config("spark.sql.files.maxPartitionBytes", 134217728)
37+
.config("spark.sql.shuffle.partitions", 200)
38+
.config("spark.sql.inMemoryColumnarStorage.compressed", value = true)
39+
// 是否启用bypass机制,如果分区数小于该则直接使用hash用于shuffle,前提shuffle map端没有预聚合操作
40+
.config("spark.shuffle.sort.bypassMergeThreshold", 300)
41+
.config("spark.shuffle.compress", value = true)
42+
.config("spark.shuffle.file.buffer", "512k")
43+
.config("spark.shuffle.io.numConnectionsPerPeer", 5)
44+
.config("spark.shuffle.spill.compress", value = true)
45+
.config("spark.io.compression.codec", "snappy")
46+
.config("spark.driver.memory", "1g")
47+
.config("spark.num.executors", "3")
48+
.config("spark.executor.memory", "2g")
49+
.config("spark.executor.cores", "3")
50+
.config("spark.default.parallelism", "10")
51+
.config("spark.mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
52+
.config("spark.sql.parquet.writeLegacyFormat", "true")
53+
.enableHiveSupport()
54+
.getOrCreate()
55+
56+
// spark.sql("show databases").show()
57+
58+
spark.sql("use wh_dwd")
59+
spark.sql("show tables").show()
60+
61+
val startLog: DataFrame = spark.table("dwd_start_log")
62+
63+
startLog.show()
64+
spark.stop()
2965
}
3066

3167

@@ -47,7 +83,7 @@ object SparkHive extends App {
4783
.set("spark.driver.host", "192.168.6.35")
4884
// 设置jar包的路径,如果有其他的依赖包,可以在这里添加,逗号隔开
4985
.setJars(List(""
50-
))
86+
))
5187
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
5288
val sc = new SparkContext(conf)
5389
val input = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 0))

0 commit comments

Comments
 (0)