Skip to content

Commit 769a5d9

Browse files
Spark UDF Examples
1 parent e79ee2d commit 769a5d9

File tree

6 files changed

+70
-25
lines changed

6 files changed

+70
-25
lines changed

src/main/scala/com/sparkbyexamples/spark/dataframe/CastColumnType.scala

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@ import org.apache.spark.sql.{Row, SparkSession}
44
import org.apache.spark.sql.types._
55
import org.apache.spark.sql.functions._
66

7-
object CastColumnType extends App{
7+
import org.apache.spark.sql.{Row, SparkSession}
8+
import org.apache.spark.sql.types._
9+
import org.apache.spark.sql.functions._
810

11+
object CastColumnType extends App{
912
val spark: SparkSession = SparkSession.builder()
1013
.master("local[1]")
1114
.appName("SparkByExamples.com")
@@ -25,26 +28,25 @@ object CastColumnType extends App{
2528
StructField("salary", DoubleType, true)
2629
))
2730

28-
val df = spark.createDataFrame(spark.sparkContext.parallelize(simpleData),simpleSchema)
31+
val df = spark.createDataFrame(
32+
spark.sparkContext.parallelize(simpleData),simpleSchema)
2933
df.printSchema()
3034
df.show(false)
3135

32-
//withColumn with the original column
3336
val df2 = df.withColumn("age",col("age").cast(StringType))
3437
.withColumn("isGraduated",col("isGraduated").cast(BooleanType))
3538
.withColumn("jobStartDate",col("jobStartDate").cast(DateType))
3639
df2.printSchema()
3740

38-
3941
val df3 = df2.selectExpr("cast(age as int) age",
4042
"cast(isGraduated as string) isGraduated",
4143
"cast(jobStartDate as string) jobStartDate")
4244
df3.printSchema()
4345
df3.show(false)
4446

4547
df3.createOrReplaceTempView("CastExample")
46-
val df4 = spark.sql("SELECT STRING(age),BOOLEAN(isGraduated),DATE(jobStartDate) from CastExample")
47-
df4.printSchema()
48-
df4.show(false)
49-
50-
}
48+
val df4 = spark.sql("SELECT STRING(age),BOOLEAN(isGraduated), " +
49+
"DATE(jobStartDate) from CastExample")
50+
df4.printSchema()
51+
df4.show(false)
52+
}

src/main/scala/com/sparkbyexamples/spark/dataframe/FromCSVFile2.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ object FromCSVFile2 {
1313

1414
val filePath="src/main/resources/stream.csv"
1515

16+
val df3 = spark.read.option("header",true).csv("src/main/resources/zipcodes.csv")
17+
df3.show(false)
18+
19+
1620
val df = spark.read.options(Map("inferSchema"->"true","delimiter"->"|","header"->"true")).csv(filePath)
1721

1822
val df2 = df.select("Gender", "BirthDate", "TotalCost", "TotalChildren", "ProductCategoryName")
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package com.sparkbyexamples.spark.dataframe
2+
3+
import org.apache.spark.sql.functions.udf
4+
import org.apache.spark.sql.functions.col
5+
import org.apache.spark.sql.{Row, SparkSession}
6+
7+
object SparkUDF extends App{
8+
9+
val spark: SparkSession = SparkSession.builder()
10+
.master("local[1]")
11+
.appName("SparkByExamples.com")
12+
.getOrCreate()
13+
14+
import spark.implicits._
15+
val columns = Seq("Seqno","Quote")
16+
val data = Seq(("1", "Be the change that you wish to see in the world"),
17+
("2", "Everyone thinks of changing the world, but no one thinks of changing himself."),
18+
("3", "The purpose of our lives is to be happy.")
19+
20+
)
21+
val df = data.toDF(columns:_*)
22+
df.show(false)
23+
24+
val convertCase = (str:String) => {
25+
val arr = str.split(" ")
26+
arr.map(f=> f.substring(0,1).toUpperCase + f.substring(1,f.length)).mkString(" ")
27+
}
28+
29+
//Using with DataFrame
30+
val convertUDF = udf(convertCase)
31+
df.select(col("Seqno"),
32+
convertUDF(col("Quote")).as("Quote") ).show(false)
33+
34+
// Using it on SQL
35+
spark.udf.register("convertUDF", convertCase)
36+
df.createOrReplaceTempView("QUOTE_TABLE")
37+
spark.sql("select Seqno, convertUDF(Quote) from QUOTE_TABLE").show(false)
38+
39+
}

src/main/scala/com/sparkbyexamples/spark/dataframe/functions/collection/MapToColumn.scala

Whitespace-only changes.

src/main/scala/com/sparkbyexamples/spark/dataframe/functions/datetime/DateInMilli.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,21 @@ object DateInMilli extends App{
1616

1717
val df = Seq(1).toDF("seq").select(
1818
current_date().as("current_date"),
19-
unix_timestamp().as("milliseconds")
20-
)
19+
unix_timestamp().as("unix_timestamp_seconds")
20+
)
2121

2222
df.printSchema()
2323
df.show(false)
2424

25-
//Convert milliseconds to date
25+
//Convert unix seconds to date
2626
df.select(
27-
to_date(col("milliseconds").cast(TimestampType)).as("current_date")
27+
to_date(col("unix_timestamp_seconds").cast(TimestampType)).as("current_date")
2828
).show(false)
2929

30-
//convert date to milliseconds
30+
//convert date to unix seconds
3131
df.select(
32-
unix_timestamp(col("current_date")).as("unix_milliseconds"),
33-
unix_timestamp(lit("12-21-2019"),"mm-DD-yyyy").as("unix_milliseconds2")
32+
unix_timestamp(col("current_date")).as("unix_seconds"),
33+
unix_timestamp(lit("12-21-2019"),"mm-DD-yyyy").as("unix_seconds2")
3434
).show(false)
3535

36-
}
36+
}

src/main/scala/com/sparkbyexamples/spark/dataframe/functions/datetime/TimeInMilli.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,22 @@ object TimeInMilli extends App{
1616

1717
val df = Seq(1).toDF("seq").select(
1818
current_timestamp().as("current_time"),
19-
unix_timestamp().as("milliseconds")
20-
)
19+
unix_timestamp().as("epoch_time_seconds")
20+
)
2121

2222
df.printSchema()
2323
df.show(false)
2424

25-
//Convert milliseconds to timestamp
25+
//Convert epoch_time to timestamp
2626
df.select(
27-
col("milliseconds").cast(TimestampType).as("current_time"),
28-
col("milliseconds").cast("timestamp").as("current_time2")
27+
col("epoch_time_seconds").cast(TimestampType).as("current_time"),
28+
col("epoch_time_seconds").cast("timestamp").as("current_time2")
2929
).show(false)
3030

31-
//convert timestamp to milliseconds
31+
//convert timestamp to Unix epoch time
3232
df.select(
33-
unix_timestamp(col("current_time")).as("unix_milliseconds"),
34-
col("current_time").cast(LongType).as("time_to_milli")
33+
unix_timestamp(col("current_time")).as("unix_epoch_time"),
34+
col("current_time").cast(LongType).as("unix_epoch_time2")
3535
).show(false)
3636

3737
}

0 commit comments

Comments
 (0)