Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Setup for Serializable APIs on DataFrames #389

Merged
merged 12 commits into from
Dec 17, 2019
33 changes: 30 additions & 3 deletions src/main/scala/io/archivesunleashed/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import io.archivesunleashed.matchbox.ExtractDateRDD.DateComponent.DateComponent
import java.net.URI
import java.net.URL
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.types.{BinaryType, IntegerType, StringType, StructField, StructType}
import org.apache.hadoop.io.LongWritable
import org.apache.spark.{SerializableWritable, SparkContext}
import org.apache.spark.{RangePartitioner, SerializableWritable, SparkContext}
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
import scala.util.matching.Regex
Expand Down Expand Up @@ -83,6 +83,32 @@ package object archivesunleashed {
}
}

/**
* A Wrapper class around DF to allow Dfs of type ARCRecord and WARCRecord to be queried via a fluent API.
*
* To load such an DF, please use [[RecordLoader]] and apply .all() on it.
*/
implicit class WARecordDF(df: DataFrame) extends java.io.Serializable {

def keepValidPagesDF(): DataFrame = {

val spark = SparkSession.builder().master("local").getOrCreate()
// scalastyle:off
import spark.implicits._
// scalastyle:on

df.filter($"crawl_date" isNotNull)
.filter(!($"url".rlike(".*robots\\.txt$")) &&
( $"mime_type_web_server".rlike("text/html") ||
$"mime_type_web_server".rlike("application/xhtml+xml") ||
$"url".rlike("(?i).*htm$") ||
$"url".rlike("(?i).*html$")
)
)
.filter($"HttpStatus" === 200)
}
}

/**
* A Wrapper class around RDD to allow RDDs of type ARCRecord and WARCRecord to be queried via a fluent API.
*
Expand All @@ -94,7 +120,7 @@ package object archivesunleashed {
Call KeepImages OR KeepValidPages on RDD depending upon the requirement before calling this method */
def all(): DataFrame = {
val records = rdd.map(r => Row(r.getCrawlDate, r.getUrl, r.getMimeType,
DetectMimeTypeTika(r.getBinaryBytes), r.getContentString, r.getBinaryBytes))
DetectMimeTypeTika(r.getBinaryBytes), r.getContentString, r.getBinaryBytes, r.getHttpStatus))

val schema = new StructType()
.add(StructField("crawl_date", StringType, true))
Expand All @@ -103,6 +129,7 @@ package object archivesunleashed {
.add(StructField("mime_type_tika", StringType, true))
.add(StructField("content", StringType, true))
.add(StructField("bytes", BinaryType, true))
.add(StructField("HttpStatus", StringType, true))

val sqlContext = SparkSession.builder()
sqlContext.getOrCreate().createDataFrame(records, schema)
Expand Down
56 changes: 56 additions & 0 deletions src/test/scala/io/archivesunleashed/RecordDFTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright © 2017 The Archives Unleashed Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.archivesunleashed

import com.google.common.io.Resources
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfter, FunSuite}

@RunWith(classOf[JUnitRunner])
class RecordDFTest extends FunSuite with BeforeAndAfter {
private val arcPath = Resources.getResource("arc/example.arc.gz").getPath
private val badPath = Resources.getResource("arc/badexample.arc.gz").getPath
private val master = "local[4]"
private val appName = "example-spark"
private var sc: SparkContext = _
private val archive = "http://www.archive.org/"
private val sloan = "http://www.sloan.org"
private val regex = raw"Please visit our website at".r

before {
val conf = new SparkConf()
.setMaster(master)
.setAppName(appName)
conf.set("spark.driver.allowMultipleContexts", "true");
sc = new SparkContext(conf)
}

test("keep Valid Pages") {
val expected = "http://www.archive.org/"
val base = RecordLoader.loadArchives(arcPath, sc).all()
.keepValidPagesDF().take(1)(0)(1)
assert (base.toString == expected)
}

after {
if (sc != null) {
sc.stop()
}
}
}