Skip to content

Commit

Permalink
Create helper methods to read and write timestamped datasets
Browse files Browse the repository at this point in the history
  • Loading branch information
MrPowers committed Mar 20, 2019
1 parent fa8d4cb commit 93cb343
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -275,20 +275,21 @@ object DataFrameExt {
.where(col("my_super_secret_count") === 1)
.drop(col("my_super_secret_count"))
}

/**
* Completely removes all duplicates from a DataFrame
*
*/
def killDuplicates(col1: String, cols: String*): DataFrame = {
df.killDuplicates((col1 +: cols).map(col):_*)
df.killDuplicates((col1 +: cols).map(col): _*)
}

/**
* Completely removes all duplicates from a DataFrame
*
*/
def killDuplicates(): DataFrame = {
df.killDuplicates(df.columns.map(col):_*)
df.killDuplicates(df.columns.map(col): _*)
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package com.github.mrpowers.spark.daria.sql

import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import org.apache.spark.sql.types._

import scala.reflect.runtime.universe._
import scala.reflect.ClassTag
Expand Down Expand Up @@ -212,6 +213,76 @@ object DataFrameHelpers extends DataFrameValidator {
case "TimestampType" => "TIMESTAMP"
case _ => "STRING"
}

}

lazy val spark: SparkSession = {
SparkSession
.builder()
.master("local")
.appName("spark session")
.getOrCreate()
}

def writeTimestamped(df: DataFrame, outputDirname: String, numPartitions: Option[Int] = None, overwriteLatest: Boolean = true): Unit = {
val timestamp: String = new java.text.SimpleDateFormat("yyyyMMddHHmmss").format(new java.util.Date())
val outputPath: String = outputDirname + "/" + timestamp
if (numPartitions.isEmpty) {
df.write.parquet(outputPath)
} else {
val p = numPartitions.get
df.repartition(p).write.parquet(outputPath)
}

if (overwriteLatest) {
val latestData = Seq(
Row(
outputPath
)
)

val latestSchema = List(
StructField(
"latest_path",
StringType,
false
)
)

val latestDF = spark.createDataFrame(
spark.sparkContext.parallelize(latestData),
StructType(latestSchema)
)

latestDF.write
.option(
"header",
"false"
)
.option(
"delimiter",
","
)
.mode(SaveMode.Overwrite)
.csv(outputDirname + "/latest")
}
}

def readTimestamped(dirname: String): DataFrame = {
val latestDF = spark.read
.option(
"header",
"false"
)
.option(
"delimiter",
","
)
.csv(dirname + "/latest")

val latestPath = latestDF.head().getString(0)

spark.read.parquet(latestPath)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1091,8 +1091,8 @@ object DataFrameExtTest extends TestSuite with DataFrameComparer with SparkSessi
("letter2", StringType, true),
("number1", IntegerType, true)
)
).killDuplicates()

)
.killDuplicates()

val expectedDF = spark.createDF(
List(
Expand Down Expand Up @@ -1131,8 +1131,11 @@ object DataFrameExtTest extends TestSuite with DataFrameComparer with SparkSessi
("letter2", StringType, true),
("number1", IntegerType, true)
)
).killDuplicates("letter1", "letter2")
df.show
)
.killDuplicates(
"letter1",
"letter2"
)

val expectedDF = spark.createDF(
List(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,42 @@ object DataFrameHelpersTest extends TestSuite with SparkSessionTestWrapper with

}

// 'writeTimestamped - {
//
// "writes out timestamped data and a latest file" - {
//
// val df = spark.createDF(
// List(
// ("giants", "football", 45),
// ("nacional", "soccer", 10)
// ),
// List(
// ("team", StringType, true),
// ("sport", StringType, true),
// ("goals_for", IntegerType, true)
// )
// )
//
// DataFrameHelpers.writeTimestamped(
// df,
// "/Users/powers/Documents/tmp/daria_timestamp_ex",
// numPartitions = Some(3),
// overwriteLatest = true
// )
//
// }
//
// }
//
// 'readTimestamped - {
//
// val df = DataFrameHelpers.readTimestamped(
// "/Users/powers/Documents/tmp/daria_timestamp_ex"
// )
// df.show()
//
// }

}

}

0 comments on commit 93cb343

Please sign in to comment.