Skip to content

Commit

Permalink
Update based on comments
Browse files Browse the repository at this point in the history
  • Loading branch information
seddonm1 committed Sep 23, 2019
1 parent bb999d7 commit 19fbed1
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 21 deletions.
9 changes: 2 additions & 7 deletions src/main/scala/ai/tripl/arc/ARC.scala
Original file line number Diff line number Diff line change
Expand Up @@ -422,18 +422,13 @@ object ARC {
def runStages(stages: List[(PipelineStage, Int)]): Option[DataFrame] = {
stages match {
case Nil => None // end
case head :: Nil =>
val stage = head._1
val index = head._2
case (stage, index) :: Nil =>
before(stage, index, pipeline.stages)
val result = processStage(stage)
after(result, stage, index, pipeline.stages)
result

//currentValue[, index[, array]]
case head :: tail =>
val stage = head._1
val index = head._2
case (stage, index) :: tail =>
before(stage, index, pipeline.stages)
val result = processStage(stage)
after(result, stage, index, pipeline.stages)
Expand Down
29 changes: 16 additions & 13 deletions src/main/scala/ai/tripl/arc/transform/SimilarityJoinTransform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -118,35 +118,33 @@ object SimilarityJoinTransformStage {

def execute(stage: SimilarityJoinTransformStage)(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext): Option[DataFrame] = {

// create a guid to name the two derived columns (leftView and rightView) to avoid collisions with existing column names
val uuid = UUID.randomUUID.toString

// split input string into individual characters
val regexTokenizer = { new RegexTokenizer()
val regexTokenizer = new RegexTokenizer()
.setInputCol(uuid)
.setPattern("")
.setMinTokenLength(1)
.setToLowercase(!stage.caseSensitive)
}

// produce ngrams to group the characters
val nGram = { new NGram()
val nGram = new NGram()
.setInputCol(regexTokenizer.getOutputCol)
.setN(stage.shingleLength)
}

// convert to vector
val countVectorizer = { new CountVectorizer()
val countVectorizer = new CountVectorizer()
.setInputCol(nGram.getOutputCol)
}

// build locality-sensitive hashing model
val minHashLSH = { new MinHashLSH()
val minHashLSH = new MinHashLSH()
.setInputCol(countVectorizer.getOutputCol)
.setNumHashTables(stage.numHashTables)
.setOutputCol("lsh")
}

val pipeline = new Pipeline().setStages(Array(regexTokenizer, nGram, countVectorizer, minHashLSH))
val pipeline = new Pipeline()
.setStages(Array(regexTokenizer, nGram, countVectorizer, minHashLSH))

val transformedDF = try {

Expand All @@ -169,11 +167,16 @@ object SimilarityJoinTransformStage {
val leftOutputColumns = leftView.columns.map{columnName => col(s"datasetA.${columnName}")}
val rightOutputColumns = rightView.columns.map{columnName => col(s"datasetB.${columnName}")}

pipelineModel.stages(3).asInstanceOf[MinHashLSHModel]
.approxSimilarityJoin(datasetA, datasetB, (1.0-stage.threshold))
.select((leftOutputColumns ++ rightOutputColumns ++ List((lit(1.0)-col("distCol")).alias("similarity"))):_*)
pipelineModel.stages.collectFirst{ case minHashLSHModel: MinHashLSHModel => minHashLSHModel } match {
case Some(minHashLSHModel) => {
minHashLSHModel
.approxSimilarityJoin(datasetA, datasetB, (1.0-stage.threshold))
.select((leftOutputColumns ++ rightOutputColumns ++ Seq((lit(1.0)-col("distCol")).alias("similarity"))):_*)
}
case None => throw new Exception("could not find MinHashLSHModel in trained model")
}

} catch {
} catch {
case e: Exception => throw new Exception(e) with DetailException {
override val detail = stage.stageDetail
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class SimilarityJoinTransformSuite extends FunSuite with BeforeAndAfter {

val rightDF = Seq(
(0L,"U3 59 INVERNESS AVENUE","NSW 2222 PENSHURST"),
(1L,"74 CANYON DR", "NSW 2768 STANEHOPE GDNS."),
(1L,"74 CANYON DR", "NSW 2768 STANEHOPE GDNS.")
).toDF("id", "street", "state_postcode_suburb")
rightDF.createOrReplaceTempView(rightView)

Expand Down

0 comments on commit 19fbed1

Please sign in to comment.