Skip to content

Commit

Permalink
Merge pull request tripl-ai#14 from tripl-ai/feature/2.1.0
Browse files Browse the repository at this point in the history
Feature/2.1.0
  • Loading branch information
seddonm1 authored Sep 23, 2019
2 parents e746838 + caf5da2 commit 0dd6ffe
Show file tree
Hide file tree
Showing 34 changed files with 5,538 additions and 5,444 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
## Change Log

# 2.1.0

- add `SimilarityJoinTransform` a stage which performs a [fuzzy match](https://en.wikipedia.org/wiki/Approximate_string_matching) and can be used for dataset deduplication or approximate joins.
- add missing types `BooleanList`, `Double`, `DoubleList`, `LongList` to config reader.

**BREAKING**
- change API for `LifecyclePlugin` to pass the stage index and the full job pipeline so that the current and other stages can be accessed in the plugin.

# 2.0.1

- update to [Spark 2.4.4](https://spark.apache.org/releases/spark-release-2-4-4.html).
Expand Down
4 changes: 2 additions & 2 deletions docs-src/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ pygmentsUseClasses=true
repo_url = "https://github.com/tripl-ai/arc"

image = "triplai/arc"
version = "2.0.1"
version = "2.1.0"
arc_jupyter_image= "triplai/arc-jupyter"
arc_jupyter_version = "1.3.1"
arc_jupyter_version = "1.4.0"
spark_version = "2.4.4"
scala_version = "2.11"
hadoop_version = "2.9.2"
Expand Down
65 changes: 30 additions & 35 deletions docs-src/content/plugins/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ Plugins are resolved dynamically at runtime and are resolved by name and version

Assuming we wanted to execute a `KafkaExtract` [Pipeline Stage Plugin](#pipeline-stage-plugins):

{{< readfile file="/resources/docs_resources_plugins/KafkaExtractMin" highlight="json" >}}
{{< readfile file="/resources/docs_resources_plugins/KafkaExtractMin" highlight="json" >}}

Arc will attempt to resolve the plugin by first looking in all the `META-INF` directories of all included `JAR` files (https://github.com/tripl-ai/arc-kafka-pipeline-plugin/blob/master/src/main/resources/META-INF/services/ai.tripl.arc.plugins.PipelineStagePlugin) for classes that extend `PipelineStagePlugin` which the `KafkaExtract` plugin does:

```scala
class KafkaExtract extends PipelineStagePlugin {
```

Arc is then able to resolve the plugin by matching on `simpleName` - in this case `KafkaExtract` - and then call the `instantiate()` method to create an instance of the plugin which is executed by Arc at the appropriate time depending on plugin type.
Arc is then able to resolve the plugin by matching on `simpleName` - in this case `KafkaExtract` - and then call the `instantiate()` method to create an instance of the plugin which is executed by Arc at the appropriate time depending on plugin type.

To allow more specitivity you can use either the full package name and/or include the version:

Expand Down Expand Up @@ -107,7 +107,7 @@ class DeltaPeriodDynamicConfigurationPlugin extends DynamicConfigurationPlugin {
val invalidKeys = checkValidKeys(c)(expectedKeys)

(returnName, lagDays, leadDays, formatter, currentDate, invalidKeys) match {
case (Right(returnName), Right(lagDays), Right(leadDays), Right(formatter), Right(currentDate), Right(invalidKeys)) =>
case (Right(returnName), Right(lagDays), Right(leadDays), Right(formatter), Right(currentDate), Right(invalidKeys)) =>

val res = (lagDays * -1 to leadDays).map { v =>
formatter.format(currentDate.plusDays(v))
Expand All @@ -122,8 +122,8 @@ class DeltaPeriodDynamicConfigurationPlugin extends DynamicConfigurationPlugin {
val err = StageError(index, this.getClass.getName, c.origin.lineNumber, allErrors)
Left(err :: Nil)
}
}
}

def parseFormatter(path: String)(formatter: String)(implicit c: Config): Either[Errors, DateTimeFormatter] = {
def err(lineNumber: Option[Int], msg: String): Either[Errors, DateTimeFormatter] = Left(ConfigError(path, lineNumber, msg) :: Nil)

Expand All @@ -132,7 +132,7 @@ class DeltaPeriodDynamicConfigurationPlugin extends DynamicConfigurationPlugin {
} catch {
case e: Exception => err(Some(c.getValue(path).origin.lineNumber()), e.getMessage)
}
}
}

def parseCurrentDate(path: String, formatter: DateTimeFormatter)(value: String)(implicit c: Config): Either[Errors, LocalDate] = {
def err(lineNumber: Option[Int], msg: String): Either[Errors, LocalDate] = Left(ConfigError(path, lineNumber, msg) :: Nil)
Expand All @@ -142,7 +142,7 @@ class DeltaPeriodDynamicConfigurationPlugin extends DynamicConfigurationPlugin {
} catch {
case e: Exception => err(Some(c.getValue(path).origin.lineNumber()), e.getMessage)
}
}
}
}

```
Expand Down Expand Up @@ -196,14 +196,11 @@ Custom `Lifecycle Plugins` allow users to extend the base Arc framework with log
```scala
package ai.tripl.arc.plugins.lifecycle

import java.util

import org.apache.spark.sql.{DataFrame, SparkSession}

import ai.tripl.arc.api.API._
import ai.tripl.arc.plugins.LifecyclePlugin
import ai.tripl.arc.util.Utils
import ai.tripl.arc.util.log.logger.Logger
import ai.tripl.arc.config.Error._

class DataFramePrinter extends LifecyclePlugin {
Expand All @@ -221,7 +218,7 @@ class DataFramePrinter extends LifecyclePlugin {
val invalidKeys = checkValidKeys(c)(expectedKeys)

(numRows, truncate, invalidKeys) match {
case (Right(numRows), Right(truncate), Right(invalidKeys)) =>
case (Right(numRows), Right(truncate), Right(invalidKeys)) =>
Right(DataFramePrinterInstance(
plugin=this,
numRows=numRows,
Expand All @@ -241,30 +238,28 @@ case class DataFramePrinterInstance(
truncate: Boolean
) extends LifecyclePluginInstance {

override def before(stage: PipelineStage)(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext) {
logger.trace()
override def before(stage: PipelineStage, index: Int, stages: List[PipelineStage])(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext) {
logger.trace()
.field("event", "before")
.field("stage", stage.name)
.log()
.log()
}

override def after(stage: PipelineStage, result: Option[DataFrame], isLast: Boolean)(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext) {
logger.trace()
override def after(result: Option[DataFrame], stage: PipelineStage, index: Int, stages: List[PipelineStage])(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext) {
logger.trace()
.field("event", "after")
.field("stage", stage.name)
.field("isLast", java.lang.Boolean.valueOf(isLast))
.log()
.log()

result match {
case Some(df) => df.show(numRows, truncate)
case None =>
}
}
}
}

```

The plugin then needs to be registered by adding the full plugin name must be listed in your projects `/resources/META-INF/services/ai.tripl.arc.plugins.LifecyclePlugin` file.
The plugin then needs to be registered by adding the full plugin name must be listed in your project's `/resources/META-INF/services/ai.tripl.arc.plugins.LifecyclePlugin` file.

To execute:

Expand Down Expand Up @@ -307,7 +302,7 @@ Ensure that any stage with a `mapPartitions` or `map` DataFrame does not require

```scala
val transformedDF = try {
df.mapPartitions[TransformedRow] { partition: Iterator[Row] =>
df.mapPartitions[TransformedRow] { partition: Iterator[Row] =>
val uri = stage.uri.toString
```

Expand All @@ -317,7 +312,7 @@ Declare the variables outside the map function so that `stage` does not have to
val stageUri = stage.uri

val transformedDF = try {
df.mapPartitions[TransformedRow] { partition: Iterator[Row] =>
df.mapPartitions[TransformedRow] { partition: Iterator[Row] =>
val uri = stageUri.toString
```

Expand All @@ -339,10 +334,10 @@ class ConsoleLoad extends PipelineStagePlugin {
val inputView = getValue[String]("inputView")
val outputMode = getValue[String]("outputMode", default = Some("Append"), validValues = "Append" :: "Complete" :: "Update" :: Nil) |> parseOutputModeType("outputMode") _
val params = readMap("params", c)
val invalidKeys = checkValidKeys(c)(expectedKeys)
val invalidKeys = checkValidKeys(c)(expectedKeys)

(name, description, inputView, outputMode, invalidKeys) match {
case (Right(name), Right(description), Right(inputView), Right(outputMode), Right(invalidKeys)) =>
case (Right(name), Right(description), Right(inputView), Right(outputMode), Right(invalidKeys)) =>
val stage = ConsoleLoadStage(
plugin=this,
name=name,
Expand All @@ -352,8 +347,8 @@ class ConsoleLoad extends PipelineStagePlugin {
params=params
)

stage.stageDetail.put("inputView", stage.inputView)
stage.stageDetail.put("outputMode", stage.outputMode.sparkString)
stage.stageDetail.put("inputView", stage.inputView)
stage.stageDetail.put("outputMode", stage.outputMode.sparkString)
stage.stageDetail.put("params", params.asJava)

Right(stage)
Expand All @@ -369,10 +364,10 @@ class ConsoleLoad extends PipelineStagePlugin {

case class ConsoleLoadStage(
plugin: ConsoleLoad,
name: String,
description: Option[String],
inputView: String,
outputMode: OutputModeType,
name: String,
description: Option[String],
inputView: String,
outputMode: OutputModeType,
params: Map[String, String]
) extends PipelineStage {

Expand All @@ -386,13 +381,13 @@ object ConsoleLoadStage {

def execute(stage: ConsoleLoadStage)(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext): Option[DataFrame] = {

val df = spark.table(stage.inputView)
val df = spark.table(stage.inputView)

if (!df.isStreaming) {
throw new Exception("ConsoleLoad can only be executed in streaming mode.") with DetailException {
override val detail = stage.stageDetail
override val detail = stage.stageDetail
}
}
}

df.writeStream
.format("console")
Expand Down Expand Up @@ -431,7 +426,7 @@ To execute:
The inbuilt [Spark SQL Functions](https://spark.apache.org/docs/latest/api/sql/index.html) are heavily optimised by the internal Spark code to a level which custom User Defined Functions cannot be (byte code) - so where possible it is better to use the inbuilt functions.
{{</note>}}

`User Defined Functions` allow users to extend the [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) dialect.
`User Defined Functions` allow users to extend the [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) dialect.

Arc already includes [some addtional functions](partials/#user-defined-functions) which are not included in the base [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) dialect so any useful generic functions can be included in the [Arc repository](https://github.com/tripl-ai/arc) so that others can benefit.

Expand Down
38 changes: 36 additions & 2 deletions docs-src/content/transform/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,40 @@ The `MLTransform` stage transforms the incoming dataset with a pretrained Spark
#### Complete
{{< readfile file="/resources/docs_resources/MLTransformComplete" highlight="json" >}}

## SimilarityJoinTransform
##### Since: 2.1.0 - Supports Streaming: True

The `SimilarityJoinTransform` stage uses [Approximate String Matching](https://en.wikipedia.org/wiki/Approximate_string_matching) (a.k.a. Fuzzy Matching) to find similar records between two datasets. It is possible to pass the same datasets into both the `leftView` and `rightView` to find duplicates (in which case the `threshold` value should be high to avoid a potentially very large cross-product resultset).

### Parameters

| Attribute | Type | Required | Description |
|-----------|------|----------|-------------|
|name|String|true|{{< readfile file="/content/partials/fields/stageName.md" markdown="true" >}}|
|environments|Array[String]|true|{{< readfile file="/content/partials/fields/environments.md" markdown="true" >}}|
|leftView|String|true|The view name of the `left` dataset. This should be the bigger of the two input sets.|
|rightView|String|true|The view name of the `right` dataset.|
|leftFields|Array[String]|true|Columns to include in the similarity join from the `left` dataset. These are order dependent.|
|rightFields|Array[String]|true|Columns to include in the similarity join from the `right` dataset. These are order dependent.|
|outputView|String|true|{{< readfile file="/content/partials/fields/outputView.md" markdown="true" >}}|
|caseSensitive|Boolean|false|Whether to use case sensitive comparison.<br><br>Default: `false`.|
|description|String|false|{{< readfile file="/content/partials/fields/description.md" markdown="true" >}}|
|numHashTables|Integer|false|The number of hash tables which can be used to trade off execution time vs. false positive rate. Lower values should produce quicker exeuction but higher false positive rate.<br><br>Default: `5`.|
|numPartitions|Integer|false|{{< readfile file="/content/partials/fields/numPartitions.md" markdown="true" >}}|
|params|Map[String, String]|false|{{< readfile file="/content/partials/fields/params.md" markdown="true" >}} Currently unused.|
|partitionBy|Array[String]|false|{{< readfile file="/content/partials/fields/partitionBy.md" markdown="true" >}}|
|persist|Boolean|false|{{< readfile file="/content/partials/fields/persist.md" markdown="true" >}}|
|shingleLength|Integer|false|The length to split the input fields into. E.g. the string `1 Parliament Drive` would be split into [`1 P`, ` Pa`, `Par`, `arl`...] if `shingleLength` is set to `3`. Longer or shorter `shingleLength` may help provide higher similarity depending on your dataset.<br><br>Default: `3`.|
|threshold|Double|false|The similarity threshold for evaluating the records as the same. The default, `0.8`, means that 80% of the character sequences must be the same for the records to be considered equal for joining.<br><br>Default: `0.8`.|

### Examples

#### Minimal
{{< readfile file="/resources/docs_resources/SimilarityJoinTransformMin" highlight="json" >}}

#### Complete
{{< readfile file="/resources/docs_resources/SimilarityJoinTransformComplete" highlight="json" >}}


## SQLTransform
##### Since: 1.0.0 - Supports Streaming: True
Expand Down Expand Up @@ -345,13 +379,13 @@ The `TensorFlowServingTransform` stage transforms the incoming dataset by callin
|inputView|String|true|{{< readfile file="/content/partials/fields/inputView.md" markdown="true" >}}|
|outputView|String|true|{{< readfile file="/content/partials/fields/outputView.md" markdown="true" >}}|
|uri|String|true|The `URI` of the TensorFlow Serving REST end point.|
|batchSize|Int|false|The number of records to sent to TensorFlow Serving in each call. A higher number will decrease the number of calls to TensorFlow Serving which may be more efficient.|
|batchSize|Integer|false|The number of records to sent to TensorFlow Serving in each call. A higher number will decrease the number of calls to TensorFlow Serving which may be more efficient.|
|description|String|false|{{< readfile file="/content/partials/fields/description.md" markdown="true" >}}|
|inputField|String|false|The field to pass to the model. JSON encoding can be used to pass multiple values (tuples).<br><br>Default: `value`.|
|numPartitions|Integer|false|{{< readfile file="/content/partials/fields/numPartitions.md" markdown="true" >}}|
|params|Map[String, String]|false|{{< readfile file="/content/partials/fields/params.md" markdown="true" >}} Currently unused.|
|partitionBy|Array[String]|false|{{< readfile file="/content/partials/fields/partitionBy.md" markdown="true" >}}|
|persist|Boolean|true|{{< readfile file="/content/partials/fields/persist.md" markdown="true" >}}|
|persist|Boolean|false|{{< readfile file="/content/partials/fields/persist.md" markdown="true" >}}|
|responseType|String|false|The type returned by the TensorFlow Serving API. Expected to be `integer`, `double` or `object` (which may present as a `string` depending on how the model has been built).<br><br>Default: `object`.|
|signatureName|String|false|{{< readfile file="/content/partials/fields/signatureName.md" markdown="true" >}}|

Expand Down
Loading

0 comments on commit 0dd6ffe

Please sign in to comment.