Skip to content

Commit

Permalink
Parallelized sentiment scoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Ubuntu committed Aug 14, 2017
1 parent 54a2346 commit 45013ba
Showing 1 changed file with 90 additions and 0 deletions.
90 changes: 90 additions & 0 deletions Code/SentimentAnalysis/movie_sentiment.R
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,93 @@ for (modelname in ls(pattern = 'model\\.')) {
}

rxRocCurve("sentiment", grep("Probability", names(scores), value=TRUE), scores)


# -----------------------------------------
# Parallelized Scoring
# -----------------------------------------

# Store files in HDFS
fs <- RxHdfsFileSystem()

# Start in local compute context
rxSetComputeContext("local")

# Save the model
saveRDS(model.rxlr2, "/tmp/sentimentModel.rds")

# Create 4 XDF data sets in HDFS totaling 25k rows
numXdfs <- 4
testData <- df[testInd,]
totalRows <- nrow(testData)
rowsPerSet <- totalRows / numXdfs

for(setNum in 1:numXdfs) {
firstRow <- (setNum - 1) * rowsPerSet + 1
lastRow <- setNum * rowsPerSet
testSubset <- testData[firstRow:lastRow,]
testSubsetXdf <- RxXdfData(paste0("/tmp/testSubsetXdf-s-", setNum), fileSystem = fs)
# Write XDF to HDFS
rxDataStep(testSubset, testSubsetXdf, overwrite = T)
}

# Define the scoring function
scoreFn <- function(setNum) {
fs <- RxHdfsFileSystem()

# Load the scoring model
modelFileName <-"/tmp/sentimentModel.rds"
model <- readRDS(modelFileName)

# Input data
testSubsetXdf <- RxXdfData(paste0("/tmp/testSubsetXdf-s-", setNum), fileSystem = fs)

# Featurized data
featurizedXdf <- RxXdfData(paste0("/tmp/featurizedXdf-s-", setNum), fileSystem = fs)

# Featurize
rxFeaturize(data=testSubsetXdf, outData = featurizedXdf,
mlTransforms = list(getSentiment(vars = c(preSentiment="review"))),
overwrite = TRUE, randomSeed = 1)

# Scores
scoresXdf <- RxXdfData(paste0("/tmp/testSubsetScoresXdf-s-", setNum), fileSystem = fs)

# Score
rxPredict(model, data = featurizedXdf, outData = scoresXdf,
extraVarsToWrite = c("fname"), overwrite = T)
}

# Set environment variables for Spark
if(file.exists("/dsvm"))
{
# Set environment variables for the Data Science VM
Sys.setenv(SPARK_HOME = "/dsvm/tools/spark/current",
HADOOP_HOME = "/opt/hadoop/current",
YARN_CONF_DIR = "/opt/hadoop/current/etc/hadoop",
PATH = paste0(Sys.getenv("PATH"), ":/opt/hadoop/current/bin"),
JAVA_HOME = "/usr/lib/jvm/java-1.8.0-openjdk-amd64"
)
} else {
Sys.setenv(SPARK_HOME="/usr/hdp/current/spark2-client")
}

# Launch Spark on YARN
cc <- rxSparkConnect(reset = TRUE,
consoleOutput = TRUE,
# numExecutors = 1,
executorCores = 2,
driverMem = "1g",
executorMem = "1g",
executorOverheadMem = "6g"
)

# Perform scoring in parallel using rxExec
scoreFiles <- rxExec(scoreFn, setNum = rxElemArg(1:numXdfs))

# Shut down the Spark application
rxSparkDisconnect(cc)

# Check the contents of the first scores file
scoresXdf <- scoreFiles[[1]]
rxGetInfo(scoresXdf, getVarInfo = T)

0 comments on commit 45013ba

Please sign in to comment.