Skip to content

Commit

Permalink
Add all() method and refactor DF UDFs (#383).
Browse files Browse the repository at this point in the history
- Add `all()` DataFrame method 
- Refactor fixity DataFrame UDFs
- Add ComputeImageSize UDF
- Add Python implementation of `all()`
- Addresses #223
  • Loading branch information
Gursimran Singh authored and ruebot committed Nov 21, 2019
1 parent d8e8df3 commit c4eaca9
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 3 deletions.
3 changes: 3 additions & 0 deletions src/main/python/aut/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ def __init__(self, sc, sqlContext, path):
self.loader = sc._jvm.io.archivesunleashed.DataFrameLoader(sc._jsc.sc())
self.path = path

def all(self):
return DataFrame(self.loader.all(self.path), self.sqlContext)

def webpages(self):
return DataFrame(self.loader.webpages(self.path), self.sqlContext)

Expand Down
7 changes: 7 additions & 0 deletions src/main/scala/io/archivesunleashed/DataFrameLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ class DataFrameLoader(sc: SparkContext) {
.webpages()
}

/** Create a DataFram with crawl_date, url, mime_type_web_server, content and bytes. */
def all(path: String): DataFrame = {
RecordLoader.loadArchives(path, sc)
.keepValidPages()
.all()
}

/** Create a DataFrame with crawl_date, source, destination, and anchor. */
def webgraph(path: String): DataFrame = {
RecordLoader.loadArchives(path, sc)
Expand Down
6 changes: 4 additions & 2 deletions src/main/scala/io/archivesunleashed/df/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@ package object df {

val ExtractImageLinksDF = udf(io.archivesunleashed.matchbox.ExtractImageLinksRDD.apply(_: String, _: String))

val ComputeMD5DF = udf((content: String) => io.archivesunleashed.matchbox.ComputeMD5RDD.apply(content.getBytes()))
val ComputeMD5DF = udf(io.archivesunleashed.matchbox.ComputeMD5RDD.apply(_: Array[Byte]))

val ComputeSHA1DF = udf((content: String) => io.archivesunleashed.matchbox.ComputeSHA1RDD.apply(content.getBytes()))
val ComputeSHA1DF = udf(io.archivesunleashed.matchbox.ComputeSHA1RDD.apply(_: Array[Byte]))

val ComputeImageSizeDF = udf(io.archivesunleashed.matchbox.ComputeImageSize.apply(_: Array[Byte]))

/**
* Given a dataframe, serializes binary object and saves to disk
Expand Down
21 changes: 20 additions & 1 deletion src/main/scala/io/archivesunleashed/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import io.archivesunleashed.matchbox.ExtractDate.DateComponent.DateComponent
import java.net.URI
import java.net.URL
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.types.{BinaryType, IntegerType, StringType, StructField, StructType}
import org.apache.hadoop.io.LongWritable
import org.apache.spark.{SerializableWritable, SparkContext}
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -87,6 +87,25 @@ package object archivesunleashed {
* To load such an RDD, please see [[RecordLoader]].
*/
implicit class WARecordRDD(rdd: RDD[ArchiveRecord]) extends java.io.Serializable {

/*Creates a column for Bytes as well in Dataframe.
Call KeepImages OR KeepValidPages on RDD depending upon the requirement before calling this method */
def all(): DataFrame = {
val records = rdd.map(r => Row(r.getCrawlDate, r.getUrl, r.getMimeType,
DetectMimeTypeTika(r.getBinaryBytes), r.getContentString, r.getBinaryBytes))

val schema = new StructType()
.add(StructField("crawl_date", StringType, true))
.add(StructField("url", StringType, true))
.add(StructField("mime_type_web_server", StringType, true))
.add(StructField("mime_type_tika", StringType, true))
.add(StructField("content", StringType, true))
.add(StructField("bytes", BinaryType, true))

val sqlContext = SparkSession.builder()
sqlContext.getOrCreate().createDataFrame(records, schema)
}

/** Removes all non-html-based data (images, executables, etc.) from html text. */
def keepValidPages(): RDD[ArchiveRecord] = {
rdd.filter(r =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class DataFrameLoaderTest extends FunSuite with BeforeAndAfter {
val powerpoint = df.presentationProgramFiles(docPath)
val word = df.wordProcessorFiles(docPath)
val text = df.textFiles(txtPath)
val all = df.all(arcPath)

val r_1 = validPages.select(url, mime_type).take(1)(0)
assert(r_1.getAs[String](url) == "http://www.archive.org/")
Expand Down Expand Up @@ -104,6 +105,10 @@ class DataFrameLoaderTest extends FunSuite with BeforeAndAfter {
val r_11 = text.take(1)(0)
assert(r_11.getAs[String](url) == "https://ruebot.net/files/aut-test-fixtures/aut-text.txt")
assert(r_11.getAs[String](md5) == "32abd404fb560ecf14b75611f3cc5c2c")

val r_12 = all.select(url, mime_type).take(1)(0)
assert(r_12.getAs[String](url) == "http://www.archive.org/")
assert(r_12.getAs[String](mime_type) == "text/html")
}

after {
Expand Down

0 comments on commit c4eaca9

Please sign in to comment.