Skip to content

Commit

Permalink
Append UDF with RDD or RF.
Browse files Browse the repository at this point in the history
- Addresses #223
  • Loading branch information
ruebot committed Nov 19, 2019
1 parent a081d7b commit 0a68c81
Show file tree
Hide file tree
Showing 32 changed files with 110 additions and 110 deletions.
8 changes: 4 additions & 4 deletions src/main/scala/io/archivesunleashed/ArchiveRecord.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.ByteArrayInputStream
import java.security.MessageDigest

import io.archivesunleashed.data.{ArcRecordUtils, WarcRecordUtils, ArchiveRecordWritable}
import io.archivesunleashed.matchbox.{ComputeMD5, ExtractDate, ExtractDomain, RemoveHTTPHeader}
import io.archivesunleashed.matchbox.{ComputeMD5RDD, ExtractDate, ExtractDomainRDD, RemoveHTTPHeaderRDD}
import org.apache.spark.SerializableWritable
import org.archive.io.arc.ARCRecord
import org.archive.io.warc.WARCRecord
Expand Down Expand Up @@ -149,14 +149,14 @@ class ArchiveRecordImpl(r: SerializableWritable[ArchiveRecordWritable]) extends
}

val getDomain: String = {
ExtractDomain(getUrl)
ExtractDomainRDD(getUrl)
}

val getBinaryBytes: Array[Byte] = {
if (getContentString.startsWith("HTTP/")) {
getContentBytes.slice(
getContentString.indexOf(RemoveHTTPHeader.headerEnd)
+ RemoveHTTPHeader.headerEnd.length, getContentBytes.length)
getContentString.indexOf(RemoveHTTPHeaderRDD.headerEnd)
+ RemoveHTTPHeaderRDD.headerEnd.length, getContentBytes.length)
} else {
getContentBytes
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ object DomainFrequencyExtractor {
def apply(records: RDD[ArchiveRecord]): RDD[(String, Int)] = {
records
.keepValidPages()
.map(r => matchbox.ExtractDomain(r.getUrl))
.map(r => matchbox.ExtractDomainRDD(r.getUrl))
.countItems()
}

Expand All @@ -47,7 +47,7 @@ object DomainFrequencyExtractor {
import spark.implicits._
// scalastyle:on

d.select(df.ExtractDomain($"url").as("domain"))
d.select(df.ExtractDomainDF($"url").as("domain"))
.groupBy("domain").count().orderBy(desc("count"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package io.archivesunleashed.app

import io.archivesunleashed.{ArchiveRecord, DataFrameLoader, CountableRDD}
import io.archivesunleashed.matchbox.{ExtractDomain, ExtractLinks}
import io.archivesunleashed.matchbox.{ExtractDomainRDD, ExtractLinksRDD}
import io.archivesunleashed.df
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions.desc
Expand All @@ -33,11 +33,11 @@ object DomainGraphExtractor {
def apply(records: RDD[ArchiveRecord]): RDD[((String, String, String), Int)] = {
records
.keepValidPages()
.map(r => (r.getCrawlDate, ExtractLinks(r.getUrl, r.getContentString)))
.map(r => (r.getCrawlDate, ExtractLinksRDD(r.getUrl, r.getContentString)))
.flatMap(r => r._2.map(f =>
(r._1,
ExtractDomain(f._1).replaceAll("^\\\\s*www\\\\.", ""),
ExtractDomain(f._2).replaceAll("^\\\\s*www\\\\.", ""))
ExtractDomainRDD(f._1).replaceAll("^\\\\s*www\\\\.", ""),
ExtractDomainRDD(f._2).replaceAll("^\\\\s*www\\\\.", ""))
))
.filter(r => r._2 != "" && r._3 != "")
.countItems()
Expand All @@ -55,8 +55,8 @@ object DomainGraphExtractor {
import spark.implicits._
// scalastyle:on
d.select($"crawl_date",
df.RemovePrefixWWW(df.ExtractDomain($"src")).as("src_domain"),
df.RemovePrefixWWW(df.ExtractDomain($"dest")).as("dest_domain"))
df.RemovePrefixWWWDF(df.ExtractDomainDF($"src")).as("src_domain"),
df.RemovePrefixWWWDF(df.ExtractDomainDF($"dest")).as("dest_domain"))
.filter("src_domain != ''").filter("dest_domain != ''")
.groupBy($"crawl_date", $"src_domain", $"dest_domain").count().orderBy(desc("count"))
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/io/archivesunleashed/app/ExtractEntities.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package io.archivesunleashed.app

import io.archivesunleashed.RecordLoader
import io.archivesunleashed.matchbox.{ComputeMD5, NERClassifier, RemoveHTML}
import io.archivesunleashed.matchbox.{ComputeMD5RDD, NERClassifier, RemoveHTMLRDD}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

Expand All @@ -43,7 +43,7 @@ object ExtractEntities {
.keepValidPages()
.map(r => (("\"timestamp\":\"" + r.getCrawlDate + "\""),
("\"url\":\"" + r.getUrl + "\""),
(RemoveHTML(r.getContentString)),
(RemoveHTMLRDD(r.getContentString)),
("\"digest\":\"" + r.getPayloadDigest + "\"")))
extractAndOutput(iNerClassifierFile, rdd, outputFile)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package io.archivesunleashed.app

import io.archivesunleashed.ArchiveRecord
import io.archivesunleashed.matchbox.{ComputeImageSize, ComputeMD5}
import io.archivesunleashed.matchbox.{ComputeImageSize, ComputeMD5RDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.{RangePartitioner, SparkContext}

Expand All @@ -39,7 +39,7 @@ object ExtractPopularImages {
val res = records
.keepImages()
.map(r => ((r.getUrl, r.getBinaryBytes), 1))
.map(img => (ComputeMD5(img._1._2), (ComputeImageSize(img._1._2), img._1._1, img._2)))
.map(img => (ComputeMD5RDD(img._1._2), (ComputeImageSize(img._1._2), img._1._1, img._2)))
.filter(img => img._2._1._1 >= minWidth && img._2._1._2 >= minHeight)
.reduceByKey((image1, image2) => (image1._1, image1._2, image1._3 + image2._3))
.map(x=> (x._2._3, x._2._2))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package io.archivesunleashed.app

import io.archivesunleashed.{ArchiveRecord, df}
import io.archivesunleashed.matchbox.RemoveHTML
import io.archivesunleashed.matchbox.RemoveHTMLRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

Expand All @@ -31,7 +31,7 @@ object PlainTextExtractor {
def apply(records: RDD[ArchiveRecord]): RDD[(String, String, String, String)] = {
records
.keepValidPages()
.map(r => (r.getCrawlDate, r.getDomain, r.getUrl, RemoveHTML(r.getContentString)))
.map(r => (r.getCrawlDate, r.getDomain, r.getUrl, RemoveHTMLRDD(r.getContentString)))
}

/** Extract plain text from web archive using Data Frame and Spark SQL.
Expand All @@ -44,7 +44,7 @@ object PlainTextExtractor {
// scalastyle:off
import spark.implicits._
// scalastyle:on
d.select($"crawl_date", df.ExtractDomain($"url").as("domain"),
$"url", df.RemoveHTML($"content").as("Text"))
d.select($"crawl_date", df.ExtractDomainDF($"url").as("domain"),
$"url", df.RemoveHTMLDF($"content").as("Text"))
}
}
16 changes: 8 additions & 8 deletions src/main/scala/io/archivesunleashed/app/WriteGEXF.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/
package io.archivesunleashed.app
import io.archivesunleashed.matchbox.{ComputeMD5, WWWLink}
import io.archivesunleashed.matchbox.{ComputeMD5RDD, WWWLink}
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -59,18 +59,18 @@ object WriteGEXF {
val endAttribute = "\" />\n"
val nodeStart = "<node id=\""
val labelStart = "\" label=\""
val edges = rdd.map(r => "<edge source=\"" + ComputeMD5(r._1._2.getBytes) + "\" target=\"" +
ComputeMD5(r._1._3.getBytes) + "\" weight=\"" + r._2 +
val edges = rdd.map(r => "<edge source=\"" + ComputeMD5RDD(r._1._2.getBytes) + "\" target=\"" +
ComputeMD5RDD(r._1._3.getBytes) + "\" weight=\"" + r._2 +
"\" type=\"directed\">\n" +
"<attvalues>\n" +
"<attvalue for=\"0\" value=\"" + r._1._1 + endAttribute +
"</attvalues>\n" +
"</edge>\n").collect
val nodes = rdd.flatMap(r => List(nodeStart +
ComputeMD5(r._1._2.getBytes) + labelStart +
ComputeMD5RDD(r._1._2.getBytes) + labelStart +
r._1._2.escapeInvalidXML() + endAttribute,
nodeStart +
ComputeMD5(r._1._3.getBytes) + labelStart +
ComputeMD5RDD(r._1._3.getBytes) + labelStart +
r._1._3.escapeInvalidXML() + endAttribute)).distinct.collect
outFile.write("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
"<gexf xmlns=\"http://www.gexf.net/1.3draft\"\n" +
Expand Down Expand Up @@ -119,13 +119,13 @@ object WriteGEXF {
"<nodes>\n")
vertices foreach { v =>
outFile.write("<node id=\"" +
ComputeMD5(v.getBytes) + "\" label=\"" +
ComputeMD5RDD(v.getBytes) + "\" label=\"" +
v.escapeInvalidXML() + endAttribute)
}
outFile.write("</nodes>\n<edges>\n")
data foreach { e =>
outFile.write("<edge source=\"" + ComputeMD5(e.get(1).asInstanceOf[String].getBytes) + "\" target=\"" +
ComputeMD5(e.get(2).asInstanceOf[String].getBytes) + "\" weight=\"" + e.get(3) +
outFile.write("<edge source=\"" + ComputeMD5RDD(e.get(1).asInstanceOf[String].getBytes) + "\" target=\"" +
ComputeMD5RDD(e.get(2).asInstanceOf[String].getBytes) + "\" weight=\"" + e.get(3) +
"\" type=\"directed\">\n" +
"<attvalues>\n" +
"<attvalue for=\"0\" value=\"" + e.get(0) + endAttribute +
Expand Down
8 changes: 4 additions & 4 deletions src/main/scala/io/archivesunleashed/app/WriteGraph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/
package io.archivesunleashed.app
import io.archivesunleashed.matchbox.{ComputeMD5, WWWLink}
import io.archivesunleashed.matchbox.{ComputeMD5RDD, WWWLink}
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -173,13 +173,13 @@ object WriteGraph {
"<nodes>\n")
vertices.foreach { v =>
outFile.write(nodeStart +
ComputeMD5(v.getBytes) + "\" label=\"" +
ComputeMD5RDD(v.getBytes) + "\" label=\"" +
v.escapeInvalidXML() + endAttribute)
}
outFile.write("</nodes>\n<edges>\n")
data.foreach { e =>
outFile.write(edgeStart + ComputeMD5(e.get(1).asInstanceOf[String].getBytes) + targetChunk +
ComputeMD5(e.get(2).asInstanceOf[String].getBytes) + "\" weight=\"" + e.get(3) +
outFile.write(edgeStart + ComputeMD5RDD(e.get(1).asInstanceOf[String].getBytes) + targetChunk +
ComputeMD5RDD(e.get(2).asInstanceOf[String].getBytes) + "\" weight=\"" + e.get(3) +
"\" type=\"directed\">\n" +
"<attvalues>\n" +
"<attvalue for=\"0\" value=\"" + e.get(0) + endAttribute +
Expand Down
10 changes: 5 additions & 5 deletions src/main/scala/io/archivesunleashed/app/WriteGraphML.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/
package io.archivesunleashed.app
import io.archivesunleashed.matchbox.{ComputeMD5, WWWLink}
import io.archivesunleashed.matchbox.{ComputeMD5RDD, WWWLink}
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -45,14 +45,14 @@ object WriteGraphML {
*/
def makeFile (rdd: RDD[((String, String, String), Int)], graphmlPath: String): Boolean = {
val outFile = Files.newBufferedWriter(Paths.get(graphmlPath), StandardCharsets.UTF_8)
val edges = rdd.map(r => "<edge source=\"" + ComputeMD5(r._1._2.getBytes) + "\" target=\"" +
ComputeMD5(r._1._3.getBytes) + "\" type=\"directed\">\n" +
val edges = rdd.map(r => "<edge source=\"" + ComputeMD5RDD(r._1._2.getBytes) + "\" target=\"" +
ComputeMD5RDD(r._1._3.getBytes) + "\" type=\"directed\">\n" +
"<data key=\"weight\">" + r._2 + "</data>\n" +
"<data key=\"crawlDate\">" + r._1._1 + "</data>\n" +
"</edge>\n").collect
val nodes = rdd.flatMap(r => List("<node id=\"" + ComputeMD5(r._1._2.getBytes) + "\">\n" +
val nodes = rdd.flatMap(r => List("<node id=\"" + ComputeMD5RDD(r._1._2.getBytes) + "\">\n" +
"<data key=\"label\">" + r._1._2.escapeInvalidXML() + "</data>\n</node>\n",
"<node id=\"" + ComputeMD5(r._1._3.getBytes) + "\">\n" +
"<node id=\"" + ComputeMD5RDD(r._1._3.getBytes) + "\">\n" +
"<data key=\"label\">" + r._1._3.escapeInvalidXML() + "</data>\n</node>\n")).distinct.collect
outFile.write("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
"<graphml xmlns=\"http://graphml.graphdrawing.org/xmlns\"\n" +
Expand Down
24 changes: 12 additions & 12 deletions src/main/scala/io/archivesunleashed/df/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package io.archivesunleashed

import org.apache.commons.io.IOUtils
import io.archivesunleashed.matchbox.{ComputeMD5}
import io.archivesunleashed.matchbox.{ComputeMD5RDD}
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.DataFrame
import java.io.ByteArrayInputStream
Expand All @@ -31,23 +31,23 @@ package object df {
// by wrapping matchbox UDFs or by reimplementing them. The following examples illustrate. Obviously, we'll
// need to populate more UDFs over time, but this is a start.

val ExtractDomain = udf(io.archivesunleashed.matchbox.ExtractDomain.apply(_: String, ""))
val ExtractDomainDF = udf(io.archivesunleashed.matchbox.ExtractDomainRDD.apply(_: String, ""))

val RemoveHTTPHeader = udf(io.archivesunleashed.matchbox.RemoveHTTPHeader.apply(_: String))
val RemoveHTTPHeaderDF = udf(io.archivesunleashed.matchbox.RemoveHTTPHeaderRDD.apply(_: String))

val RemovePrefixWWW = udf[String, String](_.replaceAll("^\\s*www\\.", ""))
val RemovePrefixWWWDF = udf[String, String](_.replaceAll("^\\s*www\\.", ""))

var RemoveHTML = udf(io.archivesunleashed.matchbox.RemoveHTML.apply(_: String))
var RemoveHTMLDF = udf(io.archivesunleashed.matchbox.RemoveHTMLRDD.apply(_: String))

val ExtractLinks = udf(io.archivesunleashed.matchbox.ExtractLinks.apply(_: String, _: String))
val ExtractLinksDF = udf(io.archivesunleashed.matchbox.ExtractLinksRDD.apply(_: String, _: String))

val GetExtensionMime = udf(io.archivesunleashed.matchbox.GetExtensionMime.apply(_: String, _: String))
val GetExtensionMimeDF = udf(io.archivesunleashed.matchbox.GetExtensionMimeRDD.apply(_: String, _: String))

val ExtractImageLinks = udf(io.archivesunleashed.matchbox.ExtractImageLinks.apply(_: String, _: String))
val ExtractImageLinksDF = udf(io.archivesunleashed.matchbox.ExtractImageLinksRDD.apply(_: String, _: String))

val ComputeMD5DF = udf((content: String) => io.archivesunleashed.matchbox.ComputeMD5.apply(content.getBytes()))
val ComputeSHA1DF = udf((content: String) => io.archivesunleashed.matchbox.ComputeSHA1.apply(content.getBytes()))
val ComputeMD5DF = udf((content: String) => io.archivesunleashed.matchbox.ComputeMD5RDD.apply(content.getBytes()))

val ComputeSHA1DF = udf((content: String) => io.archivesunleashed.matchbox.ComputeSHA1RDD.apply(content.getBytes()))

/**
* Given a dataframe, serializes binary object and saves to disk
Expand All @@ -70,7 +70,7 @@ package object df {
val in = new ByteArrayInputStream(bytes);

val extension: String = row.getAs(extensionColumnName);
val suffix = ComputeMD5(bytes)
val suffix = ComputeMD5RDD(bytes)
val file = new FileOutputStream(fileName + "-" + suffix + "." + extension.toLowerCase)
IOUtils.copy(in, file)
file.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import java.security.MessageDigest

/** Compute MD5 checksum. */
// scalastyle:off object.name
object ComputeMD5 {
object ComputeMD5RDD {
// scalastyle:on object.name
/** Computes the MD5 checksum of a byte array (eg. an image).
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import java.security.MessageDigest

/** Compute SHA1 checksum. */
// scalastyle:off object.name
object ComputeSHA1 {
object ComputeSHA1RDD {
// scalastyle:on object.name
/** Computes the MD5 checksum of a byte array (eg. an image).
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ object ExtractBoilerpipeText {
*/

def apply(input: String): String = {
removeBoilerplate(RemoveHTTPHeader(input))
removeBoilerplate(RemoveHTTPHeaderRDD(input))
}

private def removeBoilerplate(input: String): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package io.archivesunleashed.matchbox
import java.net.URL

/** Extracts the host domain name from a full url string. */
object ExtractDomain {
object ExtractDomainRDD {
/** Extract source domains from a full url string.
*
* @param url a url as a string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.jsoup.select.Elements
import scala.collection.mutable

/** Extracts image links from a webpage given the HTML content (using Jsoup). */
object ExtractImageLinks {
object ExtractImageLinksRDD {

/** Extracts image links.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.mutable
import scala.Option

/** Extracts links from a webpage given the HTML content (using Jsoup). */
object ExtractLinks {
object ExtractLinksRDD {

/** Extract links.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.apache.commons.io.FilenameUtils

/** Get file extension using MIME type, then URL extension. */
// scalastyle:off object.name
object GetExtensionMime {
object GetExtensionMimeRDD {
// scalastyle:on object.name

/** Returns the extension of a file specified by URL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import java.io.IOException
import org.jsoup.Jsoup

/** Removes HTML markup with JSoup. */
object RemoveHTML {
object RemoveHTMLRDD {

/** Removes HTML markup.
*
Expand All @@ -28,7 +28,7 @@ object RemoveHTML {
*/
def apply(content: String): String = {
// First remove the HTTP header.
val maybeContent: Option[String] = Option(RemoveHTTPHeader(content))
val maybeContent: Option[String] = Option(RemoveHTTPHeaderRDD(content))
maybeContent match {
case Some(content) =>
Jsoup.parse(content).text().replaceAll("[\\r\\n]+", " ")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package io.archivesunleashed.matchbox

/** Remove HTTP headers. */
object RemoveHTTPHeader {
object RemoveHTTPHeaderRDD {
val headerEnd = "\r\n\r\n"

/** Remove HTTP headers.
Expand Down
Loading

0 comments on commit 0a68c81

Please sign in to comment.