Skip to content

Commit

Permalink
[RP] Files renamed and README updated.
Browse files Browse the repository at this point in the history
  • Loading branch information
rambabu-posa committed Nov 9, 2019
1 parent 0a5d2f0 commit fbe2035
Show file tree
Hide file tree
Showing 32 changed files with 111 additions and 66 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,7 @@ hs_err_pid*
/bin/
*.DS_Store
.idea/*
*.iml
*.iml
target/*
project/target/*
project/project/*
59 changes: 42 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,59 @@ Welcome to Spark with Java, chapter 3. This chapter is all about the dataframe,
Labs:
* #200: `IngestionSchemaManipulationApp`: ingestion of a CSV, manipulation of schema structure post-ingestion

## Running the lab in Java

## Running PySpark
For information on running the Java lab, see chapter 1 in [Spark in Action, 2nd edition](http://jgp.net/sia).


## Running the lab using PySpark

Prerequisites:

You will need:
* `git`.
* Apache Spark (please refer Appendix P - 'Spark in production: installation and a few tips').

1. Clone this project
Assume that cloned this project to ${MY_HOME_DIR}

2. cd ${MY_HOME_DIR}/src/main/python
git clone https://github.com/jgperrin/net.jgp.books.spark.ch03

2. Go to the lab in the Python directory

cd net.jgp.books.spark.ch03/src/main/python/lab200_ingestion_schema_manipulation/

3. Execute the following spark-submit command to create a jar file to our this application

3. Execute the following spark-submit command to run this application
```
spark-submit net/jgp/books/spark/ch03/lab200_ingestion_schema_manipulation/ingestionSchemaManipulationApp.py
```
```
spark-submit ingestionSchemaManipulationApp.py
```

## Running the lab in Scala

Prerequisites:

You will need:
* `git`.
* Apache Spark (please refer Appendix P - 'Spark in production: installation and a few tips').


1. Clone this project

## Running Scala
git clone https://github.com/jgperrin/net.jgp.books.spark.ch03

1. Clone this project
Assume that cloned this project to ${MY_HOME_DIR}
2. cd net.jgp.books.spark.ch03

2. cd ${MY_HOME_DIR}
3. Package application using sbt command

3. Create application jar file
```mvn clean package```
```
sbt clean assembly
```

4. Execute the following spark-submit command to run this application
```
spark-submit --class net.jgp.books.spark.ch03.lab200_ingestion_schema_manipulation.IngestionSchemaManipulateApp target/sparkInAction2-chapter03-1.0.0-SNAPSHOT.jar
```
4. Run Spark/Scala application using spark-submit command as shown below:

```
spark-submit --class net.jgp.books.spark.ch03.lab200_ingestion_schema_manipulation.IngestionSchemaManipulationScalaApp target/scala-2.11/SparkInAction2-Chapter03-assembly-1.0.0.jar
```

Notes:
1. Due to renaming the packages to match more closely Java standards, this project is not in sync with the book's MEAP prior to v10.
Expand Down
22 changes: 22 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@

name := "SparkInAction2-Chapter03"

version := "1.0.0"

scalaVersion := "2.11.11"

val sparkVersion = "2.4.4"

resolvers ++= Seq(
"apache-snapshots" at "http://repository.apache.org/snapshots/"
)

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion
)

assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
23 changes: 0 additions & 23 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,11 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<scala.version>2.11</scala.version>
<scala.full.version>2.11.11</scala.full.version>
<spark.version>2.4.4</spark.version>
<maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version>
</properties>

<dependencies>
<!-- Scala Language Library -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.full.version}</version>
</dependency>
<!-- Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down Expand Up @@ -80,22 +73,6 @@
<target>${java.version}</target>
</configuration>
</plugin>
<!--To enable scala features in a java project-->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.full.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</build>
</project>
1 change: 1 addition & 0 deletions project/build.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version=1.0.3
1 change: 1 addition & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.10")
Empty file removed src/__init__.py
Empty file.
Empty file removed src/main/__init__.py
Empty file.
1 change: 0 additions & 1 deletion src/main/python/__init__.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,22 @@
@author rambabu.posa
"""

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit,col,concat
import os

current_dir = os.path.dirname(__file__)
relative_path = "../../../../data/Restaurants_in_Wake_County_NC.csv"
absolute_file_path = os.path.join(current_dir, relative_path)

# Creates a session on a local master
spark = SparkSession.builder.appName("Restaurants in Wake County, NC") \
.master("local[*]").getOrCreate()


# Reads a CSV file with header, called
# Restaurants_in_Wake_County_NC.csv,
# stores it in a dataframe
df = spark.read.csv(header=True, inferSchema=True,
path="../../../data/Restaurants_in_Wake_County_NC.csv")
df = spark.read.csv(header=True, inferSchema=True,path=absolute_file_path)

print("*** Right after ingestion")
df.show(5)
Expand Down Expand Up @@ -47,11 +49,8 @@
print("*** Dataframe transformed")
df.show(5)


# for book only
drop_cols=["address2","zip","tel","dateStart",
"geoX","geoY","address1","datasetId"]
dfUsedForBook = df.drop(drop_cols)
dfUsedForBook = df.drop("address2","zip","tel","dateStart","geoX","geoY","address1","datasetId")

dfUsedForBook.show(5, 15)
# end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@
"""

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit,col,concat
from pyspark.sql import functions as F
import os

current_dir = os.path.dirname(__file__)
relative_path = "../../../../data/Restaurants_in_Wake_County_NC.csv"
absolute_file_path = os.path.join(current_dir, relative_path)

# Creates a session on a local master
spark = SparkSession.builder.appName("Schema introspection for restaurants in Wake County, NC") \
Expand All @@ -14,11 +19,10 @@
# Reads a CSV file with header, called
# Restaurants_in_Wake_County_NC.csv,
# stores it in a dataframe
df = spark.read.csv(header=True, inferSchema=True,
path="../../../data/Restaurants_in_Wake_County_NC.csv")
df = spark.read.csv(header=True, inferSchema=True,path=absolute_file_path)

# Let's transform our dataframe
df = df.withColumn("county", lit("Wake")) \
df = df.withColumn("county", F.lit("Wake")) \
.withColumnRenamed("HSISID", "datasetId") \
.withColumnRenamed("NAME", "name") \
.withColumnRenamed("ADDRESS1", "address1") \
Expand All @@ -33,7 +37,7 @@
.withColumnRenamed("Y", "geoY")

df = df.withColumn("id",
concat(col("state"), lit("_"), col("county"), lit("_"), col("datasetId")))
F.concat(F.col("state"), F.lit("_"), F.col("county"), F.lit("_"), F.col("datasetId")))

# NEW
#//////////////////////////////////////////////////////////////////
Expand All @@ -48,4 +52,5 @@
schemaAsJson = schema.prettyjson
print("*** Schema as JSON: " + schemaAsJson)

df.stop()
# Good to stop SparkSession at the end of the application
spark.stop()
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,19 @@

from pyspark.sql import SparkSession
from pyspark.sql.functions import (lit,col,concat,split)
import os

current_dir = os.path.dirname(__file__)
relative_path = "../../../../data/Restaurants_in_Durham_County_NC.json"
absolute_file_path = os.path.join(current_dir, relative_path)

# Creates a session on a local master
spark = SparkSession.builder.appName("Restaurants in Durham County, NC") \
.master("local[*]").getOrCreate()

# Reads a JSON file called Restaurants_in_Durham_County_NC.json, stores
# it in a dataframe
df = spark.read.json("../../../data/Restaurants_in_Durham_County_NC.json")
df = spark.read.json(relative_path)
print("*** Right after ingestion")
df.show(5)
df.printSchema()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,22 @@
"""
import util
from pyspark.sql import SparkSession
import os

current_dir = os.path.dirname(__file__)
relative_path1 = "../../../../data/Restaurants_in_Wake_County_NC.csv"
absolute_file_path1 = os.path.join(current_dir, relative_path1)

relative_path2 = "../../../../data/Restaurants_in_Durham_County_NC.json"
absolute_file_path2 = os.path.join(current_dir, relative_path2)

# Creates a session on a local master
spark = SparkSession.builder.appName("Union of two dataframes") \
.master("local[*]").getOrCreate()

df1 = spark.read.csv(path="../../../data/Restaurants_in_Wake_County_NC.csv",header=True)
df1 = spark.read.csv(path=absolute_file_path1,header=True,inferSchema=True)

df2 = spark.read.json("../../../data/Restaurants_in_Durham_County_NC.json")
df2 = spark.read.json(absolute_file_path2)


wakeRestaurantsDf = util.build_wake_restaurants_dataframe(df1)
Expand Down
Binary file not shown.
Empty file removed src/main/python/net/__init__.py
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import org.apache.spark.sql.SparkSession
*
* @author rambabu.posa
*/
object IngestionSchemaManipulateApp {
object IngestionSchemaManipulationScalaApp {

/**
* main() is your entry point to the application.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import org.apache.spark.sql.SparkSession
*
* @author rambabu.posa
*/
object SchemaIntrospectApp {
object SchemaIntrospectionScalaApp {
/**
* main() is your entry point to the application.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import org.apache.spark.sql.SparkSession
*
* @author rambabu.posa
*/
object JsonIngestionSchemaManipulateApp {
object JsonIngestionSchemaManipulationScalaApp {

/**
* main() is your entry point to the application.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import org.apache.spark.sql.{Dataset, Row, SparkSession}
*
* @author rambabu.posa
*/
object DataframesUnionApp {
object DataframeUnionScalaApp {

/**
* main() is your entry point to the application.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
*
* @author rambabu.posa
*/
object Array2DatasetApp {
object ArrayToDatasetScalaApp {

/**
* main() is your entry point to the application.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import org.apache.spark.sql.{Dataset, Encoders, Row, SparkSession}
*
* @author rambabu.posa
*/
object Array2Dataset2DataframeApp {
object ArrayToDatasetToDataframeScalaApp {

/**
* main() is your entry point to the application.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import org.apache.spark.sql.{Dataset, Encoders, Row, SparkSession}
*
* @author rambabu.posa
*/
object Csv2DatasetBook2DataframeApp {
object CsvToDatasetBookToDataframeScalaApp {

/**
* This is a mapper class that will convert a Row to an instance of Book.
Expand Down

0 comments on commit fbe2035

Please sign in to comment.