Skip to content

Commit

Permalink
added timeseries
Browse files Browse the repository at this point in the history
  • Loading branch information
TedBear42 committed Aug 10, 2017
1 parent 67aeeb8 commit 605beb5
Show file tree
Hide file tree
Showing 187 changed files with 559 additions and 11 deletions.
1 change: 1 addition & 0 deletions data/datamodeling/GoogleAnaltyticsPivotSample.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{ "reportRequests":[ { "viewId":"XXXX", "dateRanges":[ { "endDate":"2015-06-30", "startDate":"2015-06-15" }], "dimensions":[ { "name":"ga:browser" }, { "name":"ga:campaign" }], "metrics":[ { "alias":"sessions", "expression":"ga:sessions" }], "pivots":[ { "dimensions":[ { "name":"ga:userAgeBracket" }], "startGroup":"0", "maxGroupCount":"3", "metrics":[ { "alias":"sessions", "expression":"ga:sessions" }, { "alias":"pageviews", "expression":"ga:pageviews" }] }] }]}
36 changes: 36 additions & 0 deletions data/timeseries/leadlag.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{"group":"A", "ts":1, "value":100}
{"group":"A", "ts":2, "value":101}
{"group":"A", "ts":3, "value":102}
{"group":"A", "ts":4, "value":103}
{"group":"A", "ts":5, "value":104}
{"group":"A", "ts":6, "value":105}
{"group":"A", "ts":7, "value":103}
{"group":"B", "ts":12, "value":101}
{"group":"B", "ts":13, "value":102}
{"group":"B", "ts":14, "value":103}
{"group":"B", "ts":15, "value":104}
{"group":"B", "ts":16, "value":103}
{"group":"B", "ts":17, "value":103}
{"group":"B", "ts":18, "value":102}
{"group":"B", "ts":19, "value":105}
{"group":"A", "ts":8, "value":102}
{"group":"A", "ts":9, "value":105}
{"group":"A", "ts":11, "value":100}
{"group":"A", "ts":12, "value":101}
{"group":"A", "ts":13, "value":102}
{"group":"A", "ts":14, "value":103}
{"group":"A", "ts":15, "value":104}
{"group":"A", "ts":16, "value":105}
{"group":"A", "ts":17, "value":103}
{"group":"A", "ts":18, "value":102}
{"group":"A", "ts":19, "value":105}
{"group":"B", "ts":1, "value":100}
{"group":"B", "ts":2, "value":101}
{"group":"B", "ts":3, "value":102}
{"group":"B", "ts":4, "value":103}
{"group":"B", "ts":5, "value":104}
{"group":"B", "ts":6, "value":105}
{"group":"B", "ts":7, "value":103}
{"group":"B", "ts":8, "value":102}
{"group":"B", "ts":9, "value":105}
{"group":"B", "ts":11, "value":106}
32 changes: 32 additions & 0 deletions data/timeseries/session.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{"group":"A", "ts":1, "value":100}
{"group":"A", "ts":2, "value":101}
{"group":"A", "ts":3, "value":102}
{"group":"A", "ts":4, "value":103}
{"group":"A", "ts":15, "value":104}
{"group":"A", "ts":16, "value":105}
{"group":"A", "ts":17, "value":103}
{"group":"A", "ts":28, "value":104}
{"group":"A", "ts":29, "value":105}
{"group":"A", "ts":30, "value":103}
{"group":"A", "ts":41, "value":100}
{"group":"A", "ts":42, "value":101}
{"group":"A", "ts":53, "value":102}
{"group":"A", "ts":54, "value":103}
{"group":"A", "ts":55, "value":102}
{"group":"A", "ts":56, "value":103}
{"group":"B", "ts":12, "value":101}
{"group":"B", "ts":13, "value":102}
{"group":"B", "ts":14, "value":103}
{"group":"B", "ts":15, "value":104}
{"group":"B", "ts":16, "value":103}
{"group":"B", "ts":17, "value":103}
{"group":"B", "ts":18, "value":102}
{"group":"B", "ts":19, "value":105}
{"group":"B", "ts":22, "value":101}
{"group":"B", "ts":23, "value":102}
{"group":"B", "ts":34, "value":103}
{"group":"B", "ts":35, "value":104}
{"group":"B", "ts":36, "value":103}
{"group":"B", "ts":37, "value":103}
{"group":"B", "ts":48, "value":102}
{"group":"B", "ts":49, "value":105}
6 changes: 3 additions & 3 deletions derby.log
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
----------------------------------------------------------------
Fri Jul 28 07:51:20 PDT 2017:
Booting Derby version The Apache Software Foundation - Apache Derby - 10.10.2.0 - (1582446): instance a816c00e-015d-89ae-2f6f-00000fb786a0
on database directory /Users/tmalaska/Documents/projects/spark_training/metastore_db with class loader org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@27a7ef08
Wed Aug 09 07:41:01 PDT 2017:
Booting Derby version The Apache Software Foundation - Apache Derby - 10.10.2.0 - (1582446): instance a816c00e-015d-c771-0ee8-00000fb19138
on database directory /Users/tmalaska/Documents/projects/spark_training/metastore_db with class loader org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@b965857
Loaded from file:/Users/tmalaska/.m2/repository/org/apache/derby/derby/10.10.2.0/derby-10.10.2.0.jar
java.vendor=Oracle Corporation
java.runtime.version=1.8.0_111-b14
Expand Down
9 changes: 9 additions & 0 deletions metastore_db/README_DO_NOT_TOUCH_FILES.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@

# *************************************************************************
# *** DO NOT TOUCH FILES IN THIS DIRECTORY! ***
# *** FILES IN THIS DIRECTORY AND SUBDIRECTORIES CONSTITUTE A DERBY ***
# *** DATABASE, WHICH INCLUDES THE DATA (USER AND SYSTEM) AND THE ***
# *** FILES NECESSARY FOR DATABASE RECOVERY. ***
# *** EDITING, ADDING, OR DELETING ANY OF THESE FILES MAY CAUSE DATA ***
# *** CORRUPTION AND LEAVE THE DATABASE IN A NON-RECOVERABLE STATE. ***
# *************************************************************************
Binary file added metastore_db/db.lck
Binary file not shown.
Binary file added metastore_db/dbex.lck
Binary file not shown.
8 changes: 8 additions & 0 deletions metastore_db/log/README_DO_NOT_TOUCH_FILES.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

# *************************************************************************
# *** DO NOT TOUCH FILES IN THIS DIRECTORY! ***
# *** FILES IN THIS DIRECTORY ARE USED BY THE DERBY DATABASE RECOVERY ***
# *** SYSTEM. EDITING, ADDING, OR DELETING FILES IN THIS DIRECTORY ***
# *** WILL CAUSE THE DERBY RECOVERY SYSTEM TO FAIL, LEADING TO ***
# *** NON-RECOVERABLE CORRUPT DATABASES. ***
# *************************************************************************
Binary file added metastore_db/log/log.ctrl
Binary file not shown.
Binary file added metastore_db/log/log1.dat
Binary file not shown.
Binary file added metastore_db/log/logmirror.ctrl
Binary file not shown.
8 changes: 8 additions & 0 deletions metastore_db/seg0/README_DO_NOT_TOUCH_FILES.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

# *************************************************************************
# *** DO NOT TOUCH FILES IN THIS DIRECTORY! ***
# *** FILES IN THIS DIRECTORY ARE USED BY THE DERBY DATABASE TO STORE ***
# *** USER AND SYSTEM DATA. EDITING, ADDING, OR DELETING FILES IN THIS ***
# *** DIRECTORY WILL CORRUPT THE ASSOCIATED DERBY DATABASE AND MAKE ***
# *** IT NON-RECOVERABLE. ***
# *************************************************************************
Binary file added metastore_db/seg0/c10.dat
Binary file not shown.
Binary file added metastore_db/seg0/c101.dat
Binary file not shown.
Binary file added metastore_db/seg0/c111.dat
Binary file not shown.
Binary file added metastore_db/seg0/c121.dat
Binary file not shown.
Binary file added metastore_db/seg0/c130.dat
Binary file not shown.
Binary file added metastore_db/seg0/c141.dat
Binary file not shown.
Binary file added metastore_db/seg0/c150.dat
Binary file not shown.
Binary file added metastore_db/seg0/c161.dat
Binary file not shown.
Binary file added metastore_db/seg0/c171.dat
Binary file not shown.
Binary file added metastore_db/seg0/c180.dat
Binary file not shown.
Binary file added metastore_db/seg0/c191.dat
Binary file not shown.
Binary file added metastore_db/seg0/c1a1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c1b1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c1c0.dat
Binary file not shown.
Binary file added metastore_db/seg0/c1d1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c1e0.dat
Binary file not shown.
Binary file added metastore_db/seg0/c1f1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c20.dat
Binary file not shown.
Binary file added metastore_db/seg0/c200.dat
Binary file not shown.
Binary file added metastore_db/seg0/c211.dat
Binary file not shown.
Binary file added metastore_db/seg0/c221.dat
Binary file not shown.
Binary file added metastore_db/seg0/c230.dat
Binary file not shown.
Binary file added metastore_db/seg0/c241.dat
Binary file not shown.
Binary file added metastore_db/seg0/c251.dat
Binary file not shown.
Binary file added metastore_db/seg0/c260.dat
Binary file not shown.
Binary file added metastore_db/seg0/c271.dat
Binary file not shown.
Binary file added metastore_db/seg0/c281.dat
Binary file not shown.
Binary file added metastore_db/seg0/c290.dat
Binary file not shown.
Binary file added metastore_db/seg0/c2a1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c2b1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c2c1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c2d0.dat
Binary file not shown.
Binary file added metastore_db/seg0/c2e1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c2f0.dat
Binary file not shown.
Binary file added metastore_db/seg0/c300.dat
Binary file not shown.
Binary file added metastore_db/seg0/c31.dat
Binary file not shown.
Binary file added metastore_db/seg0/c311.dat
Binary file not shown.
Binary file added metastore_db/seg0/c321.dat
Binary file not shown.
Binary file added metastore_db/seg0/c331.dat
Binary file not shown.
Binary file added metastore_db/seg0/c340.dat
Binary file not shown.
Binary file added metastore_db/seg0/c351.dat
Binary file not shown.
Binary file added metastore_db/seg0/c361.dat
Binary file not shown.
Binary file added metastore_db/seg0/c371.dat
Binary file not shown.
Binary file added metastore_db/seg0/c380.dat
Binary file not shown.
Binary file added metastore_db/seg0/c391.dat
Binary file not shown.
Binary file added metastore_db/seg0/c3a1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c3b1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c3c0.dat
Binary file not shown.
Binary file added metastore_db/seg0/c3d1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c3e1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c3f1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c400.dat
Binary file not shown.
Binary file added metastore_db/seg0/c41.dat
Binary file not shown.
Binary file added metastore_db/seg0/c411.dat
Binary file not shown.
Binary file added metastore_db/seg0/c421.dat
Binary file not shown.
Binary file added metastore_db/seg0/c430.dat
Binary file not shown.
Binary file added metastore_db/seg0/c441.dat
Binary file not shown.
Binary file added metastore_db/seg0/c451.dat
Binary file not shown.
Binary file added metastore_db/seg0/c461.dat
Binary file not shown.
Binary file added metastore_db/seg0/c470.dat
Binary file not shown.
Binary file added metastore_db/seg0/c481.dat
Binary file not shown.
Binary file added metastore_db/seg0/c490.dat
Binary file not shown.
Binary file added metastore_db/seg0/c4a1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c4b0.dat
Binary file not shown.
Binary file added metastore_db/seg0/c4c1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c4d1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c4e1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c4f0.dat
Binary file not shown.
Binary file added metastore_db/seg0/c501.dat
Binary file not shown.
Binary file added metastore_db/seg0/c51.dat
Binary file not shown.
Binary file added metastore_db/seg0/c510.dat
Binary file not shown.
Binary file added metastore_db/seg0/c521.dat
Binary file not shown.
Binary file added metastore_db/seg0/c530.dat
Binary file not shown.
Binary file added metastore_db/seg0/c541.dat
Binary file not shown.
Binary file added metastore_db/seg0/c550.dat
Binary file not shown.
Binary file added metastore_db/seg0/c561.dat
Binary file not shown.
Binary file added metastore_db/seg0/c570.dat
Binary file not shown.
Binary file added metastore_db/seg0/c581.dat
Binary file not shown.
Binary file added metastore_db/seg0/c590.dat
Binary file not shown.
Binary file added metastore_db/seg0/c5a1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c5b0.dat
Binary file not shown.
Binary file added metastore_db/seg0/c5c1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c5d0.dat
Binary file not shown.
Binary file added metastore_db/seg0/c5e1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c5f0.dat
Binary file not shown.
Binary file added metastore_db/seg0/c60.dat
Binary file not shown.
Binary file added metastore_db/seg0/c601.dat
Binary file not shown.
Binary file added metastore_db/seg0/c610.dat
Binary file not shown.
Binary file added metastore_db/seg0/c621.dat
Binary file not shown.
Binary file added metastore_db/seg0/c630.dat
Binary file not shown.
Binary file added metastore_db/seg0/c641.dat
Binary file not shown.
Binary file added metastore_db/seg0/c650.dat
Binary file not shown.
Binary file added metastore_db/seg0/c661.dat
Binary file not shown.
Binary file added metastore_db/seg0/c670.dat
Binary file not shown.
Binary file added metastore_db/seg0/c681.dat
Binary file not shown.
Binary file added metastore_db/seg0/c690.dat
Binary file not shown.
Binary file added metastore_db/seg0/c6a1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c6b0.dat
Binary file not shown.
Binary file added metastore_db/seg0/c6c1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c6d0.dat
Binary file not shown.
Binary file added metastore_db/seg0/c6e1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c6f0.dat
Binary file not shown.
Binary file added metastore_db/seg0/c701.dat
Binary file not shown.
Binary file added metastore_db/seg0/c71.dat
Binary file not shown.
Binary file added metastore_db/seg0/c711.dat
Binary file not shown.
Binary file added metastore_db/seg0/c721.dat
Binary file not shown.
Binary file added metastore_db/seg0/c731.dat
Binary file not shown.
Binary file added metastore_db/seg0/c741.dat
Binary file not shown.
Binary file added metastore_db/seg0/c751.dat
Binary file not shown.
Binary file added metastore_db/seg0/c761.dat
Binary file not shown.
Binary file added metastore_db/seg0/c771.dat
Binary file not shown.
Binary file added metastore_db/seg0/c781.dat
Binary file not shown.
Binary file added metastore_db/seg0/c791.dat
Binary file not shown.
Binary file added metastore_db/seg0/c7a1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c7b1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c7c1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c7d1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c7e1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c7f1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c801.dat
Binary file not shown.
Binary file added metastore_db/seg0/c81.dat
Binary file not shown.
Binary file added metastore_db/seg0/c811.dat
Binary file not shown.
Binary file added metastore_db/seg0/c821.dat
Binary file not shown.
Binary file added metastore_db/seg0/c831.dat
Binary file not shown.
Binary file added metastore_db/seg0/c840.dat
Binary file not shown.
Binary file added metastore_db/seg0/c851.dat
Binary file not shown.
Binary file added metastore_db/seg0/c860.dat
Binary file not shown.
Binary file added metastore_db/seg0/c871.dat
Binary file not shown.
Binary file added metastore_db/seg0/c880.dat
Binary file not shown.
Binary file added metastore_db/seg0/c891.dat
Binary file not shown.
Binary file added metastore_db/seg0/c8a0.dat
Binary file not shown.
Binary file added metastore_db/seg0/c8b1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c8c1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c8d1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c8e1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c8f1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c90.dat
Binary file not shown.
Binary file added metastore_db/seg0/c901.dat
Binary file not shown.
Binary file added metastore_db/seg0/c911.dat
Binary file not shown.
Binary file added metastore_db/seg0/c920.dat
Binary file not shown.
Binary file added metastore_db/seg0/c931.dat
Binary file not shown.
Binary file added metastore_db/seg0/c940.dat
Binary file not shown.
Binary file added metastore_db/seg0/c951.dat
Binary file not shown.
Binary file added metastore_db/seg0/c960.dat
Binary file not shown.
Binary file added metastore_db/seg0/c971.dat
Binary file not shown.
Binary file added metastore_db/seg0/c981.dat
Binary file not shown.
Binary file added metastore_db/seg0/c990.dat
Binary file not shown.
Binary file added metastore_db/seg0/c9a1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c9b1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c9c0.dat
Binary file not shown.
Binary file added metastore_db/seg0/c9d1.dat
Binary file not shown.
Binary file added metastore_db/seg0/c9e0.dat
Binary file not shown.
Binary file added metastore_db/seg0/c9f1.dat
Binary file not shown.
Binary file added metastore_db/seg0/ca01.dat
Binary file not shown.
Binary file added metastore_db/seg0/ca1.dat
Binary file not shown.
Binary file added metastore_db/seg0/ca11.dat
Binary file not shown.
Binary file added metastore_db/seg0/ca21.dat
Binary file not shown.
Binary file added metastore_db/seg0/cb1.dat
Binary file not shown.
Binary file added metastore_db/seg0/cc0.dat
Binary file not shown.
Binary file added metastore_db/seg0/cd1.dat
Binary file not shown.
Binary file added metastore_db/seg0/ce1.dat
Binary file not shown.
Binary file added metastore_db/seg0/cf0.dat
Binary file not shown.
23 changes: 23 additions & 0 deletions metastore_db/service.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#/Users/tmalaska/Documents/projects/spark_training/metastore_db
# ********************************************************************
# *** Please do NOT edit this file. ***
# *** CHANGING THE CONTENT OF THIS FILE MAY CAUSE DATA CORRUPTION. ***
# ********************************************************************
#Sun Jul 30 14:48:13 PDT 2017
SysschemasIndex2Identifier=225
SyscolumnsIdentifier=144
SysconglomeratesIndex1Identifier=49
SysconglomeratesIdentifier=32
SyscolumnsIndex2Identifier=177
SysschemasIndex1Identifier=209
SysconglomeratesIndex3Identifier=81
SystablesIndex2Identifier=129
SyscolumnsIndex1Identifier=161
derby.serviceProtocol=org.apache.derby.database.Database
SysschemasIdentifier=192
derby.storage.propertiesId=16
SysconglomeratesIndex2Identifier=65
derby.serviceLocale=en_US
SystablesIdentifier=96
SystablesIndex1Identifier=113
#--- last line, don't put anything after this line ---
Binary file added spark-warehouse/google_sample/._SUCCESS.crc
Binary file not shown.
Binary file not shown.
Empty file.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ object JsonNestedExample {

def main(args: Array[String]): Unit = {

val jsonPath = args(0)

val isLocal = true
val isLocal = args(0).equalsIgnoreCase("l")
val jsonPath = args(1)
val outputTableName = args(2)

val sparkSession = if (isLocal) {
SparkSession.builder
Expand Down Expand Up @@ -52,13 +52,12 @@ object JsonNestedExample {
println("--Tree Schema")
jsonDf.schema.printTreeString()
println("--")
jsonDf.write.saveAsTable("json_hive_table")

jsonDf.write.saveAsTable("foobar")
jsonDf.write.saveAsTable(outputTableName)

sparkSession.sqlContext.sql("select * from json_hive_table").take(10).foreach(println)
sparkSession.sqlContext.sql("select * from " + outputTableName).take(10).foreach(println)

println("--")
/*
sparkSession.sqlContext.sql("select group, explode(nested) as n1 from json_table").createOrReplaceTempView("unnested")
sparkSession.sqlContext.sql("select * from unnested").printSchema()
Expand All @@ -68,6 +67,7 @@ object JsonNestedExample {
sparkSession.sqlContext.sql("select group, a.col1, a.col2 from json_table LATERAL VIEW explode(nested) as a").printSchema()
sparkSession.sqlContext.sql("select group, a.col1, a.col2 from json_table LATERAL VIEW explode(nested) as a").rdd.foreach(println)
*/
println("---")
/*
jsonDf.rdd.map(row => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ object NestedTableExample {
.add("nested", ArrayType(nestedSchema))

val populated2Df = spark.sqlContext.createDataFrame(rowRDD, definedSchema)

println("----")
populated1Df.collect().foreach(r => println(" BuiltExample:" + r))

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.malaska.spark.training.timeseries

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

object InfectionPointWindow {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)

def main(args:Array[String]): Unit = {
val leadLagJson = args(0)

val isLocal = true

val sparkSession = if (isLocal) {
SparkSession.builder
.master("local")
.appName("my-spark-app")
.config("spark.some.config.option", "config-value")
.config("spark.driver.host", "127.0.0.1")
.config("spark.sql.parquet.compression.codec", "gzip")
.enableHiveSupport()
.getOrCreate()
} else {
SparkSession.builder
.appName("my-spark-app")
.config("spark.some.config.option", "config-value")
.enableHiveSupport()
.getOrCreate()
}
println("---")

import sparkSession.implicits._

val leadLag = sparkSession.read.json(leadLagJson).as[JsonLeadLag]

leadLag.createOrReplaceTempView("leadlag")

sparkSession.sql("select * from leadlag").collect().foreach(println)

val leadLagDf = sparkSession.sql("SELECT " +
"group, ts, " +
"value as v_now, " +
"AVG(value) OVER (ORDER BY ts rows between 3 preceding and current row) as v_moving_avg, " +
"Min(value) OVER (ORDER BY ts rows between 3 preceding and current row) as v_moving_avg, " +
"Max(value) OVER (ORDER BY ts rows between 3 preceding and current row) as v_moving_avg " +
"FROM leadlag")

leadLagDf.collect().foreach(println)

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.malaska.spark.training.timeseries

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

object LeadLagExample {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)

def main(args:Array[String]): Unit = {
val leadLagJson = args(0)

val isLocal = true

val sparkSession = if (isLocal) {
SparkSession.builder
.master("local")
.appName("my-spark-app")
.config("spark.some.config.option", "config-value")
.config("spark.driver.host", "127.0.0.1")
.config("spark.sql.parquet.compression.codec", "gzip")
.enableHiveSupport()
.getOrCreate()
} else {
SparkSession.builder
.appName("my-spark-app")
.config("spark.some.config.option", "config-value")
.enableHiveSupport()
.getOrCreate()
}
println("---")

import sparkSession.implicits._

val leadLag = sparkSession.read.json(leadLagJson).as[JsonLeadLag]

leadLag.createOrReplaceTempView("leadlag")

sparkSession.sql("select * from leadlag").collect().foreach(println)

val leadLagDf = sparkSession.sql("SELECT " +
"group, ts, " +
"value as v_now, " +
"LEAD(value) OVER (PARTITION BY group ORDER BY ts) as v_after, " +
"LAG(value) OVER (PARTITION BY group ORDER BY ts) as v_before " +
"FROM leadlag")

leadLagDf.collect().foreach(println)

leadLagDf.createOrReplaceTempView("leadlag_stage2")

leadLagDf.printSchema()

sparkSession.sql("select " +
"group, ts, v_now, v_after, v_before, " +
"case " +
" when v_now < v_after and v_now < v_before then 'valley'" +
" when v_now > v_after and v_now > v_before then 'peak'" +
" else 'n/a' " +
"end " +
"from leadlag_stage2").collect().foreach(println)
}
}

case class JsonLeadLag(group:String, ts:Long, value:Long)
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.malaska.spark.training.timeseries

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

import scala.collection.mutable

object SessionWindowing {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)

def main(args:Array[String]): Unit = {
val sessionJson = args(0)
val timeGap = args(1).toInt

val isLocal = true

val sparkSession = if (isLocal) {
SparkSession.builder
.master("local")
.appName("my-spark-app")
.config("spark.some.config.option", "config-value")
.config("spark.driver.host", "127.0.0.1")
.config("spark.sql.parquet.compression.codec", "gzip")
.enableHiveSupport()
.getOrCreate()
} else {
SparkSession.builder
.appName("my-spark-app")
.config("spark.some.config.option", "config-value")
.enableHiveSupport()
.getOrCreate()
}
println("---")

import sparkSession.implicits._

val sessionDs = sparkSession.read.json(sessionJson).as[JsonLeadLag]

sessionDs.createOrReplaceTempView("session_table")

sparkSession.sql("select * from session_table").collect().foreach(println)

val sessionDefinitinonDf = sessionDs.rdd.map(r => {
(r.group, r)
}).groupByKey().flatMap{ case (group, jsonObjIt) =>

var lastStart:Long = -1
var lastEnd:Long = -1
var sessionCount = 1
var eventsInASession = 0

val sessionList = new mutable.MutableList[SessionDefinition]

jsonObjIt.toSeq.sortBy(r => r.ts).foreach(record => {
val ts = record.ts
eventsInASession += 1

if (lastStart == -1) {
lastStart = ts
} else if (ts > lastEnd + timeGap) {
sessionList += SessionDefinition(group, lastStart, lastEnd, lastEnd - lastStart, eventsInASession)
lastStart = ts
eventsInASession = 0
}
lastEnd = ts
})
sessionList
}

sessionDefinitinonDf.collect().foreach(println)

}
}

case class SessionDefinition(group:String, sessionStart:Long, sessionEnd:Long, sessionLength:Long, sessionEvents:Int)
Loading

0 comments on commit 605beb5

Please sign in to comment.