Skip to content

Commit b12974e

Browse files
PySpark Examples
1 parent 6aeb9e8 commit b12974e

22 files changed

+787
-12
lines changed

convert-column-python-list.py

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
author SparkByExamples.com
4+
"""
5+
6+
from pyspark.sql import SparkSession
7+
spark = SparkSession.builder.master("local[1]") \
8+
.appName('SparkByExamples.com') \
9+
.getOrCreate()
10+
11+
data = [("James","Smith","USA","CA"),("Michael","Rose","USA","NY"), \
12+
("Robert","Williams","USA","CA"),("Maria","Jones","USA","FL") \
13+
]
14+
columns=["firstname","lastname","country","state"]
15+
df=spark.createDataFrame(data=data,schema=columns)
16+
df.show()
17+
print(df.collect())
18+
19+
states1=df.rdd.map(lambda x: x[3]).collect()
20+
print(states1)
21+
#['CA', 'NY', 'CA', 'FL']
22+
from collections import OrderedDict
23+
res = list(OrderedDict.fromkeys(states1))
24+
print(res)
25+
#['CA', 'NY', 'FL']
26+
27+
28+
#Example 2
29+
states2=df.rdd.map(lambda x: x.state).collect()
30+
print(states2)
31+
#['CA', 'NY', 'CA', 'FL']
32+
33+
states3=df.select(df.state).collect()
34+
print(states3)
35+
#[Row(state='CA'), Row(state='NY'), Row(state='CA'), Row(state='FL')]
36+
37+
states4=df.select(df.state).rdd.flatMap(lambda x: x).collect()
38+
print(states4)
39+
#['CA', 'NY', 'CA', 'FL']
40+
41+
states5=df.select(df.state).toPandas()['state']
42+
states6=list(states5)
43+
print(states6)
44+
#['CA', 'NY', 'CA', 'FL']
45+
46+
pandDF=df.select(df.state,df.firstname).toPandas()
47+
print(list(pandDF['state']))
48+
print(list(pandDF['firstname']))

pyspark-add-new-column.py

+69
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
author SparkByExamples.com
4+
"""
5+
6+
from pyspark.sql import SparkSession
7+
8+
spark = SparkSession.builder \
9+
.appName('SparkByExamples.com') \
10+
.getOrCreate()
11+
12+
data = [('James','Smith','M',3000),
13+
('Anna','Rose','F',4100),
14+
('Robert','Williams','M',6200),
15+
]
16+
17+
columns = ["firstname","lastname","gender","salary"]
18+
df = spark.createDataFrame(data=data, schema = columns)
19+
df.show()
20+
21+
22+
if 'salary1' not in df.columns:
23+
print("aa")
24+
25+
# Add new constanct column
26+
from pyspark.sql.functions import lit
27+
df.withColumn("bonus_percent", lit(0.3)) \
28+
.show()
29+
30+
#Add column from existing column
31+
df.withColumn("bonus_amount", df.salary*0.3) \
32+
.show()
33+
34+
#Add column by concatinating existing columns
35+
from pyspark.sql.functions import concat_ws
36+
df.withColumn("name", concat_ws(",","firstname",'lastname')) \
37+
.show()
38+
39+
#Add current date
40+
from pyspark.sql.functions import current_date
41+
df.withColumn("current_date", current_date()) \
42+
.show()
43+
44+
45+
from pyspark.sql.functions import when
46+
df.withColumn("grade", \
47+
when((df.salary < 4000), lit("A")) \
48+
.when((df.salary >= 4000) & (df.salary <= 5000), lit("B")) \
49+
.otherwise(lit("C")) \
50+
).show()
51+
52+
# Add column using select
53+
df.select("firstname","salary", lit(0.3).alias("bonus")).show()
54+
df.select("firstname","salary", lit(df.salary * 0.3).alias("bonus_amount")).show()
55+
df.select("firstname","salary", current_date().alias("today_date")).show()
56+
57+
#Add columns using SQL
58+
df.createOrReplaceTempView("PER")
59+
spark.sql("select firstname,salary, '0.3' as bonus from PER").show()
60+
spark.sql("select firstname,salary, salary * 0.3 as bonus_amount from PER").show()
61+
spark.sql("select firstname,salary, current_date() as today_date from PER").show()
62+
spark.sql("select firstname,salary, " +
63+
"case salary when salary < 4000 then 'A' "+
64+
"else 'B' END as grade from PER").show()
65+
66+
67+
68+
69+

pyspark-arraytype.py

+44
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
author SparkByExamples.com
4+
"""
5+
6+
from pyspark.sql import SparkSession
7+
from pyspark.sql.types import StringType, ArrayType,StructType,StructField
8+
spark = SparkSession.builder \
9+
.appName('SparkByExamples.com') \
10+
.getOrCreate()
11+
12+
13+
arrayCol = ArrayType(StringType(),False)
14+
15+
data = [
16+
("James,,Smith",["Java","Scala","C++"],["Spark","Java"],"OH","CA"),
17+
("Michael,Rose,",["Spark","Java","C++"],["Spark","Java"],"NY","NJ"),
18+
("Robert,,Williams",["CSharp","VB"],["Spark","Python"],"UT","NV")
19+
]
20+
21+
schema = StructType([
22+
StructField("name",StringType(),True),
23+
StructField("languagesAtSchool",ArrayType(StringType()),True),
24+
StructField("languagesAtWork",ArrayType(StringType()),True),
25+
StructField("currentState", StringType(), True),
26+
StructField("previousState", StringType(), True)
27+
])
28+
29+
df = spark.createDataFrame(data=data,schema=schema)
30+
df.printSchema()
31+
df.show()
32+
33+
from pyspark.sql.functions import explode
34+
df.select(df.name,explode(df.languagesAtSchool)).show()
35+
36+
from pyspark.sql.functions import split
37+
df.select(split(df.name,",").alias("nameAsArray")).show()
38+
39+
from pyspark.sql.functions import array
40+
df.select(df.name,array(df.currentState,df.previousState).alias("States")).show()
41+
42+
from pyspark.sql.functions import array_contains
43+
df.select(df.name,array_contains(df.languagesAtSchool,"Java")
44+
.alias("array_contains")).show()

pyspark-change-string-double.py

+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
author SparkByExamples.com
4+
"""
5+
6+
from pyspark.sql import SparkSession
7+
from pyspark.sql.types import DoubleType, IntegerType
8+
# Create SparkSession
9+
spark = SparkSession.builder \
10+
.appName('SparkByExamples.com') \
11+
.getOrCreate()
12+
13+
simpleData = [("James","34","true","M","3000.6089"),
14+
("Michael","33","true","F","3300.8067"),
15+
("Robert","37","false","M","5000.5034")
16+
]
17+
18+
columns = ["firstname","age","isGraduated","gender","salary"]
19+
df = spark.createDataFrame(data = simpleData, schema = columns)
20+
df.printSchema()
21+
df.show(truncate=False)
22+
23+
from pyspark.sql.functions import col,round,expr
24+
df.withColumn("salary",df.salary.cast('double')).printSchema()
25+
df.withColumn("salary",df.salary.cast(DoublerType())).printSchema()
26+
df.withColumn("salary",col("salary").cast('double')).printSchema()
27+
28+
#df.withColumn("salary",round(df.salary.cast(DoubleType()),2)).show(truncate=False).printSchema()
29+
df.selectExpr("firstname","isGraduated","cast(salary as double) salary").printSchema()
30+
31+
df.createOrReplaceTempView("CastExample")
32+
spark.sql("SELECT firstname,isGraduated,DOUBLE(salary) as salary from CastExample").printSchema()
33+
34+
35+
#df.select("firstname",expr(df.age),"isGraduated",col("salary").cast('float').alias("salary")).show()

pyspark-convert-map-to-columns.py

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
author SparkByExamples.com
4+
"""
5+
6+
from pyspark.sql import SparkSession
7+
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
8+
9+
dataDictionary = [
10+
('James',{'hair':'black','eye':'brown'}),
11+
('Michael',{'hair':'brown','eye':None}),
12+
('Robert',{'hair':'red','eye':'black'}),
13+
('Washington',{'hair':'grey','eye':'grey'}),
14+
('Jefferson',{'hair':'brown','eye':''})
15+
]
16+
17+
df = spark.createDataFrame(data=dataDictionary, schema = ['name','properties'])
18+
df.printSchema()
19+
df.show(truncate=False)
20+
21+
df3=df.rdd.map(lambda x: \
22+
(x.name,x.properties["hair"],x.properties["eye"])) \
23+
.toDF(["name","hair","eye"])
24+
df3.printSchema()
25+
df3.show()
26+
27+
df.withColumn("hair",df.properties.getItem("hair")) \
28+
.withColumn("eye",df.properties.getItem("eye")) \
29+
.drop("properties") \
30+
.show()
31+
32+
df.withColumn("hair",df.properties["hair"]) \
33+
.withColumn("eye",df.properties["eye"]) \
34+
.drop("properties") \
35+
.show()
36+
37+
# Functions
38+
from pyspark.sql.functions import explode,map_keys,col
39+
keysDF = df.select(explode(map_keys(df.properties))).distinct()
40+
keysList = keysDF.rdd.map(lambda x:x[0]).collect()
41+
keyCols = list(map(lambda x: col("properties").getItem(x).alias(str(x)), keysList))
42+
df.select(df.name, *keyCols).show()
+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
author SparkByExamples.com
4+
"""
5+
6+
from pyspark.sql import SparkSession
7+
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
8+
9+
dataDictionary = [
10+
('James',{'hair':'black','eye':'brown'}),
11+
('Michael',{'hair':'brown','eye':None}),
12+
('Robert',{'hair':'red','eye':'black'}),
13+
('Washington',{'hair':'grey','eye':'grey'}),
14+
('Jefferson',{'hair':'brown','eye':''})
15+
]
16+
17+
df = spark.createDataFrame(data=dataDictionary, schema = ['name','properties'])
18+
df.printSchema()
19+
df.show(truncate=False)
20+
21+
# Using StructType schema
22+
from pyspark.sql.types import StructField, StructType, StringType, MapType,IntegerType
23+
schema = StructType([
24+
StructField('Name', StringType(), True),
25+
StructField('properties', MapType(StringType(),StringType()),True)
26+
])
27+
df2 = spark.createDataFrame(data=dataDictionary, schema = schema)
28+
df2.printSchema()
29+
df2.show(truncate=False)
30+
31+
df3=df.rdd.map(lambda x: \
32+
(x.name,x.properties["hair"],x.properties["eye"])) \
33+
.toDF(["name","hair","eye"])
34+
df3.printSchema()
35+
df3.show()
36+
37+
df.withColumn("hair",df.properties.getItem("hair")) \
38+
.withColumn("eye",df.properties.getItem("eye")) \
39+
.drop("properties") \
40+
.show()
41+
42+
df.withColumn("hair",df.properties["hair"]) \
43+
.withColumn("eye",df.properties["eye"]) \
44+
.drop("properties") \
45+
.show()
46+
47+
# Functions
48+
from pyspark.sql.functions import explode,map_keys,col
49+
keysDF = df.select(explode(map_keys(df.properties))).distinct()
50+
keysList = keysDF.rdd.map(lambda x:x[0]).collect()
51+
keyCols = list(map(lambda x: col("properties").getItem(x).alias(str(x)), keysList))
52+
df.select(df.name, *keyCols).show()

pyspark-explode-array-map.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
('Robert',['CSharp',''],{'hair':'red','eye':''}),
1616
('Washington',None,None),
1717
('Jefferson',['1','2'],{})
18-
18+
]
1919
df = spark.createDataFrame(data=arrayData, schema = ['name','knownLanguages','properties'])
2020
df.printSchema()
2121
df.show()
@@ -45,7 +45,7 @@
4545

4646
from pyspark.sql.functions import posexplode_outer
4747
""" with array """
48-
df.select($"name",posexplode_outer($"knownLanguages")).show()
48+
df.select(df.name,posexplode_outer(df.knownLanguages)).show()
4949

5050
""" with map """
5151
df.select(df.name,posexplode_outer(df.properties)).show()

pyspark-filter.py

+20
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,31 @@
3737
df.printSchema()
3838
df.show(truncate=False)
3939

40+
#Equals
4041
df.filter(df.state == "OH") \
4142
.show(truncate=False)
4243

44+
#Not equals
45+
df.filter(~(df.state == "OH")) \
46+
.show(truncate=False)
47+
df.filter(df.state != "OH") \
48+
.show(truncate=False)
49+
4350
df.filter(col("state") == "OH") \
4451
.show(truncate=False)
4552

4653
df.filter("gender == 'M'") \
4754
.show(truncate=False)
4855

56+
df.filter("gender <> 'M'") \
57+
.show(truncate=False)
58+
59+
#IS IN
60+
li=["OH","CA","DE"]
61+
df.filter(df.state.isin(li)).show()
62+
#IS NOT IN
63+
df.filter(~df.state.isin(li)).show()
64+
4965
df.filter( (df.state == "OH") & (df.gender == "M") ) \
5066
.show(truncate=False)
5167

@@ -54,4 +70,8 @@
5470

5571
df.filter(df.name.lastname == "Williams") \
5672
.show(truncate=False)
73+
74+
df.filter(df.state.startswith("N")).show()
75+
df.filter(df.state.endswith("H")).show()
76+
df.filter(df.state.like("N%")).show()
5777

pyspark-filter2.py

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
author SparkByExamples.com
4+
"""
5+
6+
import pyspark
7+
from pyspark.sql import SparkSession
8+
9+
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
10+
11+
data2 = [(1,"James Smith"), (2,"Michael Rose"),
12+
(3,"Robert Williams"), (4,"Rames Rose"),(5,"Rames rose")
13+
]
14+
df2 = spark.createDataFrame(data = data2, schema = ["id","name"])
15+
16+
df2.filter(df2.name.like("%rose%")).show()
17+
df2.filter(df2.name.rlike("(?i)^*rose$")).show()

0 commit comments

Comments
 (0)