diff --git a/src/main/python/aut/common.py b/src/main/python/aut/common.py index aeeaccb1..73d158ad 100644 --- a/src/main/python/aut/common.py +++ b/src/main/python/aut/common.py @@ -8,6 +8,9 @@ def __init__(self, sc, sqlContext, path): self.loader = sc._jvm.io.archivesunleashed.DataFrameLoader(sc._jsc.sc()) self.path = path + def all(self): + return DataFrame(self.loader.all(self.path), self.sqlContext) + def webpages(self): return DataFrame(self.loader.webpages(self.path), self.sqlContext) diff --git a/src/main/scala/io/archivesunleashed/DataFrameLoader.scala b/src/main/scala/io/archivesunleashed/DataFrameLoader.scala index 32341151..a608a763 100644 --- a/src/main/scala/io/archivesunleashed/DataFrameLoader.scala +++ b/src/main/scala/io/archivesunleashed/DataFrameLoader.scala @@ -27,6 +27,13 @@ class DataFrameLoader(sc: SparkContext) { .webpages() } + /** Create a DataFram with crawl_date, url, mime_type_web_server, content and bytes. */ + def all(path: String): DataFrame = { + RecordLoader.loadArchives(path, sc) + .keepValidPages() + .all() + } + /** Create a DataFrame with crawl_date, source, destination, and anchor. */ def webgraph(path: String): DataFrame = { RecordLoader.loadArchives(path, sc) diff --git a/src/main/scala/io/archivesunleashed/df/package.scala b/src/main/scala/io/archivesunleashed/df/package.scala index b35bd307..0367aa28 100644 --- a/src/main/scala/io/archivesunleashed/df/package.scala +++ b/src/main/scala/io/archivesunleashed/df/package.scala @@ -45,9 +45,11 @@ package object df { val ExtractImageLinksDF = udf(io.archivesunleashed.matchbox.ExtractImageLinksRDD.apply(_: String, _: String)) - val ComputeMD5DF = udf((content: String) => io.archivesunleashed.matchbox.ComputeMD5RDD.apply(content.getBytes())) + val ComputeMD5DF = udf(io.archivesunleashed.matchbox.ComputeMD5RDD.apply(_: Array[Byte])) - val ComputeSHA1DF = udf((content: String) => io.archivesunleashed.matchbox.ComputeSHA1RDD.apply(content.getBytes())) + val ComputeSHA1DF = udf(io.archivesunleashed.matchbox.ComputeSHA1RDD.apply(_: Array[Byte])) + + val ComputeImageSizeDF = udf(io.archivesunleashed.matchbox.ComputeImageSize.apply(_: Array[Byte])) /** * Given a dataframe, serializes binary object and saves to disk diff --git a/src/main/scala/io/archivesunleashed/package.scala b/src/main/scala/io/archivesunleashed/package.scala index cfeb9930..af994c1f 100644 --- a/src/main/scala/io/archivesunleashed/package.scala +++ b/src/main/scala/io/archivesunleashed/package.scala @@ -30,7 +30,7 @@ import io.archivesunleashed.matchbox.ExtractDate.DateComponent.DateComponent import java.net.URI import java.net.URL import org.apache.spark.sql.{DataFrame, Row, SparkSession} -import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import org.apache.spark.sql.types.{BinaryType, IntegerType, StringType, StructField, StructType} import org.apache.hadoop.io.LongWritable import org.apache.spark.{SerializableWritable, SparkContext} import org.apache.spark.rdd.RDD @@ -87,6 +87,25 @@ package object archivesunleashed { * To load such an RDD, please see [[RecordLoader]]. */ implicit class WARecordRDD(rdd: RDD[ArchiveRecord]) extends java.io.Serializable { + + /*Creates a column for Bytes as well in Dataframe. + Call KeepImages OR KeepValidPages on RDD depending upon the requirement before calling this method */ + def all(): DataFrame = { + val records = rdd.map(r => Row(r.getCrawlDate, r.getUrl, r.getMimeType, + DetectMimeTypeTika(r.getBinaryBytes), r.getContentString, r.getBinaryBytes)) + + val schema = new StructType() + .add(StructField("crawl_date", StringType, true)) + .add(StructField("url", StringType, true)) + .add(StructField("mime_type_web_server", StringType, true)) + .add(StructField("mime_type_tika", StringType, true)) + .add(StructField("content", StringType, true)) + .add(StructField("bytes", BinaryType, true)) + + val sqlContext = SparkSession.builder() + sqlContext.getOrCreate().createDataFrame(records, schema) + } + /** Removes all non-html-based data (images, executables, etc.) from html text. */ def keepValidPages(): RDD[ArchiveRecord] = { rdd.filter(r => diff --git a/src/test/scala/io/archivesunleashed/df/DataFrameLoaderTest.scala b/src/test/scala/io/archivesunleashed/df/DataFrameLoaderTest.scala index ffea0b38..f0800b04 100644 --- a/src/test/scala/io/archivesunleashed/df/DataFrameLoaderTest.scala +++ b/src/test/scala/io/archivesunleashed/df/DataFrameLoaderTest.scala @@ -60,6 +60,7 @@ class DataFrameLoaderTest extends FunSuite with BeforeAndAfter { val powerpoint = df.presentationProgramFiles(docPath) val word = df.wordProcessorFiles(docPath) val text = df.textFiles(txtPath) + val all = df.all(arcPath) val r_1 = validPages.select(url, mime_type).take(1)(0) assert(r_1.getAs[String](url) == "http://www.archive.org/") @@ -104,6 +105,10 @@ class DataFrameLoaderTest extends FunSuite with BeforeAndAfter { val r_11 = text.take(1)(0) assert(r_11.getAs[String](url) == "https://ruebot.net/files/aut-test-fixtures/aut-text.txt") assert(r_11.getAs[String](md5) == "32abd404fb560ecf14b75611f3cc5c2c") + + val r_12 = all.select(url, mime_type).take(1)(0) + assert(r_12.getAs[String](url) == "http://www.archive.org/") + assert(r_12.getAs[String](mime_type) == "text/html") } after {