Skip to content

Commit

Permalink
rewrite cpp source code to actually make use of C++11 features plus o…
Browse files Browse the repository at this point in the history
…ther minor changes

Signed-off-by: Yitao Li <yitao@rstudio.com>
  • Loading branch information
yitao-li committed Dec 17, 2020
1 parent 16b7640 commit 8d898a7
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 72 deletions.
4 changes: 0 additions & 4 deletions R/RcppExports.R
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
# Generated by using Rcpp::compileAttributes() -> do not edit by hand
# Generator token: 10BE3573-1514-4C36-9D1C-5A225CD40393

rcpp_hello_world <- function() {
.Call(`_sparkwarc_rcpp_hello_world`)
}

rcpp_read_warc <- function(path, filter, include) {
.Call(`_sparkwarc_rcpp_read_warc`, path, filter, include)
}
Expand Down
11 changes: 6 additions & 5 deletions R/sparkwarc.R
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@
#'
#' \dontrun{
#' library(sparklyr)
#' sc <- spark_connect(master = "spark://HOST:PORT")
#' df <- spark_read_warc(
#' library(sparkwarc)
#' sc <- spark_connect(master = "local")
#' sdf <- spark_read_warc(
#' sc,
#' system.file("samples/sample.warc", package = "sparkwarc"),
#' repartition = FALSE,
#' name = "sample_warc",
#' path = system.file(file.path("samples", "sample.warc"), package = "sparkwarc"),
#' memory = FALSE,
#' overwrite = FALSE
#' )
Expand Down Expand Up @@ -62,7 +63,7 @@ spark_read_warc <- function(sc,
paths_df,
name = "sparkwarc_paths",
overwrite = TRUE,
repartition = path_repartition)
repartition = as.integer(path_repartition))

df <- spark_apply(paths_tbl, function(df) {
entries <- apply(df, 1, function(path) {
Expand Down
9 changes: 5 additions & 4 deletions man/spark_read_warc.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 4 additions & 15 deletions src/RcppExports.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,21 @@

using namespace Rcpp;

// rcpp_hello_world
List rcpp_hello_world();
RcppExport SEXP _sparkwarc_rcpp_hello_world() {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
rcpp_result_gen = Rcpp::wrap(rcpp_hello_world());
return rcpp_result_gen;
END_RCPP
}
// rcpp_read_warc
DataFrame rcpp_read_warc(std::string path, std::string filter, std::string include);
DataFrame rcpp_read_warc(std::string const& path, std::string const& filter, std::string const& include);
RcppExport SEXP _sparkwarc_rcpp_read_warc(SEXP pathSEXP, SEXP filterSEXP, SEXP includeSEXP) {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
Rcpp::traits::input_parameter< std::string >::type path(pathSEXP);
Rcpp::traits::input_parameter< std::string >::type filter(filterSEXP);
Rcpp::traits::input_parameter< std::string >::type include(includeSEXP);
Rcpp::traits::input_parameter< std::string const& >::type path(pathSEXP);
Rcpp::traits::input_parameter< std::string const& >::type filter(filterSEXP);
Rcpp::traits::input_parameter< std::string const& >::type include(includeSEXP);
rcpp_result_gen = Rcpp::wrap(rcpp_read_warc(path, filter, include));
return rcpp_result_gen;
END_RCPP
}

static const R_CallMethodDef CallEntries[] = {
{"_sparkwarc_rcpp_hello_world", (DL_FUNC) &_sparkwarc_rcpp_hello_world, 0},
{"_sparkwarc_rcpp_read_warc", (DL_FUNC) &_sparkwarc_rcpp_read_warc, 3},
{NULL, NULL, 0}
};
Expand Down
85 changes: 41 additions & 44 deletions src/warc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,61 +4,54 @@ using namespace Rcpp;
#include <stdio.h>
#include <zlib.h>

// [[Rcpp::export]]
List rcpp_hello_world() {

CharacterVector x = CharacterVector::create( "foo", "bar" ) ;
NumericVector y = NumericVector::create( 0.0, 1.0 ) ;
List z = List::create( x, y ) ;

return z ;
}

std::size_t rcpp_find_tag(std::string line, std::size_t pos) {
std::size_t tag_start = line.find("<", pos);
if (tag_start != std::string::npos && line.find(">", tag_start + 1)) {
std::size_t rcpp_find_tag(std::string const &line, std::size_t pos) {
auto const tag_start = line.find("<", pos);
if (tag_start != std::string::npos &&
line.find(">", tag_start + 1) != std::string::npos) {
return tag_start;
}

return std::string::npos;
}

constexpr std::size_t kBufSz = 4 * 1024;
constexpr std::size_t kAvgWarcSz = 40 * 1024;
std::string const kWarcSep = "WARC/1.0";

// [[Rcpp::export]]
DataFrame rcpp_read_warc(std::string path,
std::string filter,
std::string include) {
DataFrame rcpp_read_warc(std::string const &path, std::string const &filter,
std::string const &include) {

FILE *fp = fopen(path.c_str(), "rb");
if (!fp) Rcpp::stop("Failed to open WARC file.");
if (!fp)
Rcpp::stop("Failed to open WARC file.");

gzFile gzf = gzdopen(fileno(fp), "rb");
if (!gzf) Rcpp::stop("Failed to open WARC as a compressed file.");
if (!gzf)
Rcpp::stop("Failed to open WARC as a compressed file.");

const int buffer_size = 4 * 1024;
char buffer[buffer_size] = {'\0'};
char buf[kBufSz] = {'\0'};

std::list<std::string> warc_entries;

const int warc_mean_size = 40 * 1024;
std::string warc_entry;
warc_entry.reserve(warc_mean_size);
warc_entry.reserve(kAvgWarcSz);

bool one_matched = false;
const std::string warc_separator = "WARC/1.0";

long stats_tags_total = 0;
std::list<long> warc_stats;

while(gzgets(gzf, buffer, buffer_size) != Z_NULL) {
std::string line(buffer);
while (gzgets(gzf, buf, kBufSz) != Z_NULL) {
std::string line(buf);

if (!filter.empty() && !one_matched) {
one_matched = line.find(filter) != std::string::npos;
}

if (std::string(line).substr(0, warc_separator.size()) == warc_separator && warc_entry.size() > 0) {
if (line.substr(0, kWarcSep.size()) == kWarcSep && warc_entry.size() > 0) {
if (filter.empty() || one_matched) {
warc_entries.push_back(warc_entry);
warc_entries.emplace_back(std::move(warc_entry));
warc_stats.push_back(stats_tags_total);
stats_tags_total = 0;
}
Expand All @@ -68,31 +61,35 @@ DataFrame rcpp_read_warc(std::string path,
warc_entry.clear();
}

if (include.empty() || line.find(include) != std::string::npos) {
warc_entry.append(line);
}

std::size_t tag_start = rcpp_find_tag(line, 0);
while(tag_start != std::string::npos) {
auto tag_start = rcpp_find_tag(line, 0);
while (tag_start != std::string::npos) {
stats_tags_total += 1;
tag_start = rcpp_find_tag(line, tag_start + 1);
}

if (include.empty() || line.find(include) != std::string::npos) {
warc_entry.append(std::move(line));
}
}

if (gzf) gzclose(gzf);
if (fp) fclose(fp);
if (gzf)
gzclose(gzf);
if (fp)
fclose(fp);

long idxEntry = 0;
std::size_t idxEntry = 0;
CharacterVector results(warc_entries.size());
std::for_each(warc_entries.begin(), warc_entries.end(), [&results, &idxEntry](std::string &entry) {
results[idxEntry++] = entry;
});
std::for_each(std::make_move_iterator(warc_entries.begin()),
std::make_move_iterator(warc_entries.end()),
[&results, &idxEntry](std::string &&entry) {
results[idxEntry++] = std::move(entry);
});

long idxStat = 0;
std::size_t idxStat = 0;
NumericVector stats(warc_stats.size());
std::for_each(warc_stats.begin(), warc_stats.end(), [&stats, &idxStat](long &stat) {
stats[idxStat++] = stat;
});
std::for_each(warc_stats.begin(), warc_stats.end(),
[&stats, &idxStat](long &stat) { stats[idxStat++] = stat; });

return DataFrame::create(Named("tags") = stats, _["content"] = results);
return DataFrame::create(Named("tags") = std::move(stats),
_["content"] = std::move(results));
}

0 comments on commit 8d898a7

Please sign in to comment.