Skip to content

Commit

Permalink
Merge pull request #2 from javierluraschi/feature/spark-apply-rcpp
Browse files Browse the repository at this point in the history
Scale WARC processing using Rcpp and sparklyr::spark_apply
  • Loading branch information
javierluraschi authored Jul 23, 2017
2 parents 8079ec8 + 3c15c99 commit a5a0f77
Show file tree
Hide file tree
Showing 16 changed files with 316 additions and 38 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,8 @@ log4j.spark*
.cache-main
.settings
.classpath
# Rcpp
/src/*.o
/src/*.o-*
/src/*.d
/src/*.so
10 changes: 7 additions & 3 deletions DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: sparkwarc
Type: Package
Title: Load WARC Files into Apache Spark
Version: 0.1.1
Version: 0.1.3
Authors@R: c(
person("Javier", "Luraschi", email = "javier@rstudio.com", role = c("aut", "cre"))
)
Expand All @@ -13,6 +13,10 @@ BugReports: https://github.com/javierluraschi/sparkwarc
Encoding: UTF-8
LazyData: true
Imports:
DBI,
sparklyr,
DBI
RoxygenNote: 5.0.1
Rcpp
RoxygenNote: 6.0.0
LinkingTo:
Rcpp,
SystemRequirements: C++11
4 changes: 4 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
# Generated by roxygen2: do not edit by hand

export(cc_warc)
export(rcpp_read_warc_sample)
export(spark_read_warc)
export(spark_read_warc_sample)
export(spark_warc_sample_path)
import(DBI)
import(sparklyr)
importFrom(utils,read.table)
useDynLib(sparkwarc, .registration = TRUE)
11 changes: 11 additions & 0 deletions R/RcppExports.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Generated by using Rcpp::compileAttributes() -> do not edit by hand
# Generator token: 10BE3573-1514-4C36-9D1C-5A225CD40393

rcpp_hello_world <- function() {
.Call(`_sparkwarc_rcpp_hello_world`)
}

rcpp_read_warc <- function(path, filter, include) {
.Call(`_sparkwarc_rcpp_read_warc`, path, filter, include)
}

42 changes: 42 additions & 0 deletions R/sample.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#' Retrieves sample warc path
#'
#' @export
spark_warc_sample_path <- function() {
normalizePath(system.file("samples/sample.warc.gz", package = "sparkwarc"))
}

#' Loads the sample warc file in Rcpp
#'
#' @param filter A regular expression used to filter to each warc entry
#' efficiently by running native code using \code{Rcpp}.
#' @param include A regular expression used to keep only matching lines
#' efficiently by running native code using \code{Rcpp}.
#'
#' @export
rcpp_read_warc_sample <- function(filter = "", include = "") {
sample_warc <- spark_warc_sample_path()

sparkwarc:::rcpp_read_warc(sample_warc, filter, include)
}

#' Loads the sample warc file in Spark
#'
#' @param An active \code{spark_connection}.
#' @param filter A regular expression used to filter to each warc entry
#' efficiently by running native code using \code{Rcpp}.
#' @param include A regular expression used to keep only matching lines
#' efficiently by running native code using \code{Rcpp}.
#'
#' @export
spark_read_warc_sample <- function(sc, filter = "", include = "") {
sample_warc <- spark_warc_sample_path()

spark_read_warc(
sc,
"sample_warc",
sample_warc,
overwrite = TRUE,
group = TRUE,
filter = filter,
include = include)
}
53 changes: 43 additions & 10 deletions R/sparkwarc.R
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
#' @param group \code{TRUE} to group by warc segment. Currently supported
#' only in HDFS and uncompressed files.
#' @param parse \code{TRUE} to parse warc into tags, attribute, value, etc.
#' @param filter A regular expression used to filter to each warc entry
#' efficiently by running native code using \code{Rcpp}.
#' @param include A regular expression used to keep only matching lines
#' efficiently by running native code using \code{Rcpp}.
#' @param ... Additional arguments reserved for future use.
#'
#' @examples
Expand All @@ -32,6 +36,7 @@
#' spark_disconnect(sc)
#'
#' @export
#' @useDynLib sparkwarc, .registration = TRUE
#' @import DBI
spark_read_warc <- function(sc,
name,
Expand All @@ -41,26 +46,54 @@ spark_read_warc <- function(sc,
overwrite = TRUE,
group = FALSE,
parse = FALSE,
filter = "",
include = "",
...) {
if (overwrite && name %in% dbListTables(sc)) {
dbRemoveTable(sc, name)
}

df <- sparklyr::invoke_static(
sc,
"SparkWARC.WARC",
if (parse) "parse" else "load",
spark_context(sc),
path,
group,
as.integer(repartition))
if (!parse) {
paths_df <- data.frame(paths = strsplit(path, ",")[[1]])
paths_tbl <- sdf_copy_to(sc, paths_df, name = "sparkwarc_paths", overwrite = TRUE)

invoke(df, "registerTempTable", name)
if (repartition > 0)
paths_tbl <- sdf_repartition(paths_tbl, repartition)

df <- spark_apply(paths_tbl, function(df) {
rcpp_read_warc <- get("rcpp_read_warc", envir = asNamespace("sparkwarc"))

entries <- apply(df, 1, function(path) {
if (grepl("s3n://", path)) {
path <- sub("s3n://commoncrawl/", "https://commoncrawl.s3.amazonaws.com/", path)
temp_warc <- tempfile(fileext = ".warc.gz")
download.file(url = path, destfile = temp_warc)
path <- temp_warc
}

rcpp_read_warc(path, filter = filter, include = include)
})

if (nrow(df) > 1) do.call("rbind", entries) else data.frame(entries)
}, names = c("tags", "content")) %>% spark_dataframe()
}
else {
df <- sparklyr::invoke_static(
sc,
"SparkWARC.WARC",
if (parse) "parse" else "load",
spark_context(sc),
path,
group,
as.integer(repartition))
}

result_tbl <- sdf_register(df, name)

if (memory) {
dbGetQuery(sc, paste("CACHE TABLE", DBI::dbQuoteIdentifier(sc, name)))
dbGetQuery(sc, paste("SELECT count(*) FROM", DBI::dbQuoteIdentifier(sc, name)))
}

invisible(NULL)
result_tbl
}
11 changes: 5 additions & 6 deletions README.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@ library(dplyr)

```{r connect-1, max.print=10}
sc <- spark_connect(master = "local")
spark_read_warc(
sc,
"warc",
system.file("samples/sample.warc.gz", package = "sparkwarc"),
repartition = 8)
```

```{r load-sample}
spark_read_warc(sc, path = spark_warc_sample_path(), name = "WARC")
```

```{sql query-1, connection=sc, max.print=1}
Expand Down Expand Up @@ -105,7 +104,7 @@ config[["sparklyr.shell.driver-memory"]] <- "10G"
sc <- spark_connect(master = "local", config = config)
```

```{r load-1}
```{r load-full}
spark_read_warc(
sc,
"warc",
Expand Down
16 changes: 0 additions & 16 deletions java/SparkWARC.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,6 @@ import scala.util.matching._
import org.apache.spark.sql.types._

object WARC {
def load(sc: SparkContext, path: String, group: Boolean, repartitions: Int) : DataFrame = {
if (group) sc.hadoopConfiguration.set(
"textinputformat.record.delimiter", "WARC/1.0"
)

val warc = sc.textFile(path)

val sqlContext = new SQLContext(sc)
import sqlContext.implicits._

val df = warc.toDF
sc.hadoopConfiguration.unset("textinputformat.record.delimiter")

if (repartitions > 0) df.repartition(repartitions) else df
}

def parse(sc: SparkContext, path: String, group: Boolean, repartitions: Int) : DataFrame = {
val sqlContext = new SQLContext(sc)
val warc = sc.textFile(path)
Expand Down
1 change: 0 additions & 1 deletion man/cc_warc.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions man/rcpp_read_warc_sample.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 8 additions & 2 deletions man/spark_read_warc.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions man/spark_read_warc_sample.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions man/spark_warc_sample_path.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 41 additions & 0 deletions src/RcppExports.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Generated by using Rcpp::compileAttributes() -> do not edit by hand
// Generator token: 10BE3573-1514-4C36-9D1C-5A225CD40393

#include <Rcpp.h>

using namespace Rcpp;

// rcpp_hello_world
List rcpp_hello_world();
RcppExport SEXP _sparkwarc_rcpp_hello_world() {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
rcpp_result_gen = Rcpp::wrap(rcpp_hello_world());
return rcpp_result_gen;
END_RCPP
}
// rcpp_read_warc
DataFrame rcpp_read_warc(std::string path, std::string filter, std::string include);
RcppExport SEXP _sparkwarc_rcpp_read_warc(SEXP pathSEXP, SEXP filterSEXP, SEXP includeSEXP) {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
Rcpp::traits::input_parameter< std::string >::type path(pathSEXP);
Rcpp::traits::input_parameter< std::string >::type filter(filterSEXP);
Rcpp::traits::input_parameter< std::string >::type include(includeSEXP);
rcpp_result_gen = Rcpp::wrap(rcpp_read_warc(path, filter, include));
return rcpp_result_gen;
END_RCPP
}

static const R_CallMethodDef CallEntries[] = {
{"_sparkwarc_rcpp_hello_world", (DL_FUNC) &_sparkwarc_rcpp_hello_world, 0},
{"_sparkwarc_rcpp_read_warc", (DL_FUNC) &_sparkwarc_rcpp_read_warc, 3},
{NULL, NULL, 0}
};

RcppExport void R_init_sparkwarc(DllInfo *dll) {
R_registerRoutines(dll, NULL, CallEntries, NULL, NULL);
R_useDynamicSymbols(dll, FALSE);
}
1 change: 1 addition & 0 deletions src/makevars
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CXX_STD=CXX11 PKG_LIBS=-lboost_regex
Loading

0 comments on commit a5a0f77

Please sign in to comment.