Skip to content

Commit

Permalink
updated docs
Browse files Browse the repository at this point in the history
  • Loading branch information
mahmoudparsian committed Sep 1, 2019
1 parent 6e7e30d commit 5695e4b
Show file tree
Hide file tree
Showing 9 changed files with 1,329 additions and 0 deletions.
13 changes: 13 additions & 0 deletions LICENSE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Copyright [2019] [Mahmoud Parsian]

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
439 changes: 439 additions & 0 deletions PySpark_examples.html

Large diffs are not rendered by default.

32 changes: 32 additions & 0 deletions SparkWordCount.java.jdk8.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.axiomine.spark.examples.wordcount;

import java.io.File;
import java.util.Arrays;

import org.apache.commons.io.FileUtils;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import scala.Tuple2;
//http://stackoverflow.com/questions/19620642/failed-to-locate-the-winutils-binary-in-the-hadoop-binary-path
public class SparkWordCount {
public static void main(String[] args) throws Exception {
System.out.println(System.getProperty("hadoop.home.dir"));
String inputPath = args[0];
String outputPath = args[1];
FileUtils.deleteQuietly(new File(outputPath));

JavaSparkContext sc = new JavaSparkContext("local", "sparkwordcount");

JavaRDD<String> rdd = sc.textFile(inputPath);

JavaPairRDD<String, Integer> counts = rdd
.flatMap(x -> Arrays.asList(x.split(" ")))
.mapToPair(x -> new Tuple2<String, Integer>(x, 1))
.reduceByKey((x, y) -> x + y);

counts.saveAsTextFile(outputPath);
sc.close();
}
}
18 changes: 18 additions & 0 deletions bigram.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
input_list = ['all', 'this', 'happened', 'more', 'or', 'less']
#
def find_bigrams(input_list):
bigram_list = []
for i in range(len(input_list)-1):
print i
bigram_list.append((input_list[i], input_list[i+1]))
return bigram_list
#
bigrams = find_bigrams(input_list)
print bigrams
# will output:
#0
#1
#2
#3
#4
#[('all', 'this'), ('this', 'happened'), ('happened', 'more'), ('more', 'or'), ('or', 'less')]
375 changes: 375 additions & 0 deletions pyspark_bigram.txt

Large diffs are not rendered by default.

57 changes: 57 additions & 0 deletions pyspark_session_2.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
interactive session: valid and tested: Feb. 23, 2015

mparsian@Mahmouds-MacBook:~/zmp/BigData-MapReduce-Course/pyspark# cat data.txt
crazy crazy fox jumped
crazy fox jumped
fox is fast
fox is smart
dog is smart

SPARK_HOME=~/zmp/zs/spark-1.2.0
mparsian@Mahmouds-MacBook:~/zmp/BigData-MapReduce-Course/pyspark# ~/zmp/zs/spark-1.2.0/bin/pyspark
Python 2.6.9 (unknown, Sep 9 2014, 15:05:12)
[GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.39)] on darwin
Type "help", "copyright", "credits" or "license" for more information.

Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 1.2.0
/_/

Using Python version 2.6.9 (unknown, Sep 9 2014 15:05:12)
SparkContext available as sc.
>>> sc
<pyspark.context.SparkContext object at 0x10ae02210>
>>> lines = sc.textFile("data.txt", 1)
>>> debuglines = lines.collect();
>>> debuglines
[u'crazy crazy fox jumped', u'crazy fox jumped', u'fox is fast', u'fox is smart', u'dog is smart']
>>> words = lines.flatMap(lambda x: x.split(' '))
>>> debugwords = words.collect();
>>> debugwords
[u'crazy', u'crazy', u'fox', u'jumped', u'crazy', u'fox', u'jumped', u'fox', u'is', u'fast', u'fox', u'is', u'smart', u'dog', u'is', u'smart']
>>> ones = words.map(lambda x: (x, 1))
>>> debugones = ones.collect()
>>> debugones
[(u'crazy', 1), (u'crazy', 1), (u'fox', 1), (u'jumped', 1), (u'crazy', 1), (u'fox', 1), (u'jumped', 1), (u'fox', 1), (u'is', 1), (u'fast', 1), (u'fox', 1), (u'is', 1), (u'smart', 1), (u'dog', 1), (u'is', 1), (u'smart', 1)]
>>> counts = ones.reduceByKey(lambda x, y: x + y)
>>> debugcounts = counts.collect()
>>> debugcounts
[(u'crazy', 3), (u'jumped', 2), (u'is', 3), (u'fox', 4), (u'dog', 1), (u'fast', 1), (u'smart', 2)]
>>>

>>> grouped = ones.groupByKey();
>>> debuggrouped = grouped.collect();

>>> counts.saveAsTextFile("output")

mparsian@Mahmouds-MacBook:~/zmp/BigData-MapReduce-Course/pyspark# cat output.txt/part*
(u'crazy', 3)
(u'jumped', 2)
(u'is', 3)
(u'fox', 4)
(u'dog', 1)
(u'fast', 1)
(u'smart', 2)
234 changes: 234 additions & 0 deletions spark_in_action_blog.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
Spark In Action : Processing Data



This blog is intended for Data engineers , Data Scientists who are
planning to use Spark, Scala and for anyone who is interested in
learning Spark by practise.

Understanding How to use Spark for Data Exploration

Goal of the blog is to provide a quick start to Apache Spark and how to apply

RDD
Transformations
Actions
Perform Joins
Spark SQL
Spark automatic Json schema inference capability
Caching
Lazy Evaluation


Quick Intro to Spark

Apache Spark is often called Swiss Knife because of its ability to do
different tasks with ease . It can handle data cleansing , data exploration ,
feature extraction , sql query , machine learning , complex graph algorithms ,
building streaming applications , etc.

Getting started with spark


In case you do not have a spark setup on your machine , do not worry . It
takes less than 2 minutes to get started. Download spark from "Apache Spark".
Extract it and start the Scala REPL from $Spark_Home/bin/spark-shell.

Note:-

$Spark_Home is the location where you extracted.

Loading DataSet


Download the required DataSet from github repository and execute the
below steps in the spark-shell.

val rawData = sc.textFile("path/usagov_bitly_data2012-03-16-1331923249.txt")
rawData:
org.apache.spark.rdd.RDD[String] = usagov_bitly_data2012-03-16-1331923249.txt
MappedRDD[1] at textFile at :12

We just created an RDD called rawData.

What is RDD : In simplest words it can be expressed as a data structure
distributed across many machines. RDD are the core of Spark, and all
operations revolve around it.

Spark provides two kinds of operations , one is transformation and another is action.

Transformations and Actions


What is Transformation : Transformation converts one RDD to another RDD.

What is Action: Action on the other hand produces an output like printing
the data , storing the data .

In this case we created an RDD of type String.

What is sc : For spark shell , Spark creates a context sc. It is the main entry
point for spark functionality. It helps you to create RDD , accumulator and
broadcast variables.

What is textFile: textFile method helps you to read data from local file system
or Hadoop file system or any Hadoop-supported system URI.

Before proceeding further , Lets try to understand about the data set.

In 2011, URL shortening service bit.ly partnered with the United States government
website usa.gov to provide a feed of anonymous data gathered from users who shorten
links ending with .gov or .mil.Each line in each file contains a common form of web
data known as JSON .

So one of the key point is , we are gonna deal with json data and each line in the
file represents a json input.

Lets try to see what our RDD contains. Spark provides methods like collect ,
take(numofrecords) , first to access the elements in the RDD.

collect() : It brings the entire data to the driver program . In our case driver
program is Spark shell . You cannot use collect when the data set is huge , that
it cannot fit into memory of single machine.You can use collect when your dataset
is limited.

take(n) : It complements the limitation of collect. It returns the n elements
of the RDD.

first : It returns only one element from the RDD. Useful when you want to quickly
check whether your operations worked correctly.

lets try first on our rawData. "rawData.first"

res0: String = { "a": "Mozilla\/5.0 (Windows NT 6.1; WOW64) AppleWebKit\/535.11 (KHTML, like Gecko) Chrome\/17.0.963.78 Safari\/535.11", "c": "US", "nk": 1, "tz": "America\/New_York", "gr": "MA", "g": "A6qOVH", "h": "wfLQtf", "l": "orofrog", "al": "en-US,en;q=0.8", "hh": "1.usa.gov", "r": "http:\/\/www.facebook.com\/l\/7AQEFzjSi\/1.usa.gov\/wfLQtf", "u": "http:\/\/www.ncbi.nlm.nih.gov\/pubmed\/22415991", "t": 1331923247, "hc": 1331822918, "cy": "Danvers", "ll": [ 42.576698, -70.954903 ] }
Spark SQL Json inference

To understand about the data set we should bring json data to a structured
form like table. For that we need to extract the elements . There are several
json libraries which can help in parsing data. We will use Spark Sql capability
of inferring JSON Schema to structure our data.

import org.apache.spark.sql.SQLContext



val sqlContext = new SQLContext(sc)
val recordsJson = sqlContext.jsonRDD(rawData)
recordsJson.registerTempTable("records")

SparkSql is a subproject of Apache Spark to deal with structured data .
The beauty of it is , it works seamlessly with Spark programs.It provides
various features to handle data from Hive, Json , parquet and thrift
service for external programs to connect.There are many more features
that Spark Sql supports.

Lets try to check what is the schema of our data. SparkSQL comes
with a printSchema method.
recordsJson.printSchema
root
|-- _heartbeat_: integer (nullable = true)
|-- a: string (nullable = true)
|-- al: string (nullable = true)
|-- c: string (nullable = true)
|-- cy: string (nullable = true)
|-- g: string (nullable = true)
|-- gr: string (nullable = true)
|-- h: string (nullable = true)
|-- hc: integer (nullable = true)
|-- hh: string (nullable = true)
|-- kw: string (nullable = true)
|-- l: string (nullable = true)
|-- ll: array (nullable = true)
| |-- element: double (containsNull = false)
|-- nk: integer (nullable = true)
|-- r: string (nullable = true)
|-- t: integer (nullable = true)
|-- tz: string (nullable = true)
|-- u: string (nullable = true)

One advantage of SparkSQL is we can use the spark functionalities also.
Now lets try to proceed with our exploration of the dataset. Lets take
one particular field say tz which stands for time zone. Lets quickly
write a SQL to extract the data.

val x = sqlContext.sql("select tz from records")

Now x will contain only the time zone records.

Lets try to find how many times each time zone occurs in the data set.

How can we solve it. Lets get each element , and then map it to 1 to form a key value pair.

x.map(row => row(0)).map(temp => (temp,1))
Here x is a SchemaRDD.

map(f): map is a transformation applied on the rdd . It accepts a
function as input , and applies it to each element of RDD in a parallel fashion.







Lets see how the elements have transformed . We will do take(5) on the above RDD .

(America/New_York,1)
(America/Denver,1)
(America/New_York,1)
(America/Sao_Paulo,1)
(America/New_York,1)

Now a simple question that you can be asking yourself could be why I am pairing each field with 1. Basically we converted our fields to a tuple of key-value pairs.

Spark provides useful transformations on key-value pair RDD like reduceByKey , groupByKey , countByKey and many more.

Lets get back to our question , how many times each time zone occurs . Hope you would have guessed it , let us use reduceByKey on the data.

x.map(row => row(0)).map(temp => (temp,1)).reduceByKey((x,y) => x+y).take(5).foreach(println)

(null,120)
(Europe/Brussels,4)
(Europe/Vienna,6)
(Europe/Athens,6)
(America/Puerto_Rico,10)

Note: Remember that transformations in Spark are Lazy , it means till you perform an action like count,first,collect,saveAsTextFile transformations are not evaluated.

Looking at the result , we can recognise that there are null values . Lets try to remove records that contain null values. Spark comes with a handy filter method.

x.map(row => row(0)).filter(x => x!=null).map(temp => (temp,1)).reduceByKey((x,y) => x+y).take(5).foreach(println)

(Europe/Brussels,4)
(Asia/Bangkok,6)
(Pacific/Honolulu,36)
(America/Santo_Domingo,1)
(Europe/Bucharest,4)

Lets see how could we sort the data. We will use sortBy .

x.map(row => row(0)).filter(x => x!=null).map(temp => (temp,1)).reduceByKey((x,y) => x+y).sortBy(_._2,false).take(5).foreach(println)

(America/New_York,1251)
(,521)
(America/Chicago,400)
(America/Los_Angeles,382)
(America/Denver,191)

Looking at the data , we can guess that the data needs more cleansing. Which I will leave it to you.
Use the filter() on the RDD and remove the empty rows.

We can simplify the entire task by writing a SQL query on records table .

sqlContext.sql("select tz,count(tz) as total from records where tz != '' and tz is not NULL group by tz order by total desc").take(5).foreach(println)

[America/New_York,1251]
[America/Chicago,400]
[America/Los_Angeles,382]
[America/Denver,191]
[Europe/London,74]

In the next blog we will look into how joins , caching , lazy evaluation works.

Leave your comments below if you found the blog useful , or any suggestions.

Binary file added tutorial/.DS_Store
Binary file not shown.
Loading

0 comments on commit 5695e4b

Please sign in to comment.