Skip to content

Commit

Permalink
Update plugin doc
Browse files Browse the repository at this point in the history
  • Loading branch information
seddonm1 committed Sep 22, 2019
1 parent 3d158c5 commit bb999d7
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 67 deletions.
65 changes: 30 additions & 35 deletions docs-src/content/plugins/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ Plugins are resolved dynamically at runtime and are resolved by name and version

Assuming we wanted to execute a `KafkaExtract` [Pipeline Stage Plugin](#pipeline-stage-plugins):

{{< readfile file="/resources/docs_resources_plugins/KafkaExtractMin" highlight="json" >}}
{{< readfile file="/resources/docs_resources_plugins/KafkaExtractMin" highlight="json" >}}

Arc will attempt to resolve the plugin by first looking in all the `META-INF` directories of all included `JAR` files (https://github.com/tripl-ai/arc-kafka-pipeline-plugin/blob/master/src/main/resources/META-INF/services/ai.tripl.arc.plugins.PipelineStagePlugin) for classes that extend `PipelineStagePlugin` which the `KafkaExtract` plugin does:

```scala
class KafkaExtract extends PipelineStagePlugin {
```

Arc is then able to resolve the plugin by matching on `simpleName` - in this case `KafkaExtract` - and then call the `instantiate()` method to create an instance of the plugin which is executed by Arc at the appropriate time depending on plugin type.
Arc is then able to resolve the plugin by matching on `simpleName` - in this case `KafkaExtract` - and then call the `instantiate()` method to create an instance of the plugin which is executed by Arc at the appropriate time depending on plugin type.

To allow more specitivity you can use either the full package name and/or include the version:

Expand Down Expand Up @@ -107,7 +107,7 @@ class DeltaPeriodDynamicConfigurationPlugin extends DynamicConfigurationPlugin {
val invalidKeys = checkValidKeys(c)(expectedKeys)

(returnName, lagDays, leadDays, formatter, currentDate, invalidKeys) match {
case (Right(returnName), Right(lagDays), Right(leadDays), Right(formatter), Right(currentDate), Right(invalidKeys)) =>
case (Right(returnName), Right(lagDays), Right(leadDays), Right(formatter), Right(currentDate), Right(invalidKeys)) =>

val res = (lagDays * -1 to leadDays).map { v =>
formatter.format(currentDate.plusDays(v))
Expand All @@ -122,8 +122,8 @@ class DeltaPeriodDynamicConfigurationPlugin extends DynamicConfigurationPlugin {
val err = StageError(index, this.getClass.getName, c.origin.lineNumber, allErrors)
Left(err :: Nil)
}
}
}

def parseFormatter(path: String)(formatter: String)(implicit c: Config): Either[Errors, DateTimeFormatter] = {
def err(lineNumber: Option[Int], msg: String): Either[Errors, DateTimeFormatter] = Left(ConfigError(path, lineNumber, msg) :: Nil)

Expand All @@ -132,7 +132,7 @@ class DeltaPeriodDynamicConfigurationPlugin extends DynamicConfigurationPlugin {
} catch {
case e: Exception => err(Some(c.getValue(path).origin.lineNumber()), e.getMessage)
}
}
}

def parseCurrentDate(path: String, formatter: DateTimeFormatter)(value: String)(implicit c: Config): Either[Errors, LocalDate] = {
def err(lineNumber: Option[Int], msg: String): Either[Errors, LocalDate] = Left(ConfigError(path, lineNumber, msg) :: Nil)
Expand All @@ -142,7 +142,7 @@ class DeltaPeriodDynamicConfigurationPlugin extends DynamicConfigurationPlugin {
} catch {
case e: Exception => err(Some(c.getValue(path).origin.lineNumber()), e.getMessage)
}
}
}
}

```
Expand Down Expand Up @@ -196,14 +196,11 @@ Custom `Lifecycle Plugins` allow users to extend the base Arc framework with log
```scala
package ai.tripl.arc.plugins.lifecycle

import java.util

import org.apache.spark.sql.{DataFrame, SparkSession}

import ai.tripl.arc.api.API._
import ai.tripl.arc.plugins.LifecyclePlugin
import ai.tripl.arc.util.Utils
import ai.tripl.arc.util.log.logger.Logger
import ai.tripl.arc.config.Error._

class DataFramePrinter extends LifecyclePlugin {
Expand All @@ -221,7 +218,7 @@ class DataFramePrinter extends LifecyclePlugin {
val invalidKeys = checkValidKeys(c)(expectedKeys)

(numRows, truncate, invalidKeys) match {
case (Right(numRows), Right(truncate), Right(invalidKeys)) =>
case (Right(numRows), Right(truncate), Right(invalidKeys)) =>
Right(DataFramePrinterInstance(
plugin=this,
numRows=numRows,
Expand All @@ -241,30 +238,28 @@ case class DataFramePrinterInstance(
truncate: Boolean
) extends LifecyclePluginInstance {

override def before(stage: PipelineStage)(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext) {
logger.trace()
override def before(stage: PipelineStage, index: Int, stages: List[PipelineStage])(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext) {
logger.trace()
.field("event", "before")
.field("stage", stage.name)
.log()
.log()
}

override def after(stage: PipelineStage, result: Option[DataFrame], isLast: Boolean)(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext) {
logger.trace()
override def after(result: Option[DataFrame], stage: PipelineStage, index: Int, stages: List[PipelineStage])(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext) {
logger.trace()
.field("event", "after")
.field("stage", stage.name)
.field("isLast", java.lang.Boolean.valueOf(isLast))
.log()
.log()

result match {
case Some(df) => df.show(numRows, truncate)
case None =>
}
}
}
}

```

The plugin then needs to be registered by adding the full plugin name must be listed in your projects `/resources/META-INF/services/ai.tripl.arc.plugins.LifecyclePlugin` file.
The plugin then needs to be registered by adding the full plugin name must be listed in your project's `/resources/META-INF/services/ai.tripl.arc.plugins.LifecyclePlugin` file.

To execute:

Expand Down Expand Up @@ -307,7 +302,7 @@ Ensure that any stage with a `mapPartitions` or `map` DataFrame does not require

```scala
val transformedDF = try {
df.mapPartitions[TransformedRow] { partition: Iterator[Row] =>
df.mapPartitions[TransformedRow] { partition: Iterator[Row] =>
val uri = stage.uri.toString
```

Expand All @@ -317,7 +312,7 @@ Declare the variables outside the map function so that `stage` does not have to
val stageUri = stage.uri

val transformedDF = try {
df.mapPartitions[TransformedRow] { partition: Iterator[Row] =>
df.mapPartitions[TransformedRow] { partition: Iterator[Row] =>
val uri = stageUri.toString
```

Expand All @@ -339,10 +334,10 @@ class ConsoleLoad extends PipelineStagePlugin {
val inputView = getValue[String]("inputView")
val outputMode = getValue[String]("outputMode", default = Some("Append"), validValues = "Append" :: "Complete" :: "Update" :: Nil) |> parseOutputModeType("outputMode") _
val params = readMap("params", c)
val invalidKeys = checkValidKeys(c)(expectedKeys)
val invalidKeys = checkValidKeys(c)(expectedKeys)

(name, description, inputView, outputMode, invalidKeys) match {
case (Right(name), Right(description), Right(inputView), Right(outputMode), Right(invalidKeys)) =>
case (Right(name), Right(description), Right(inputView), Right(outputMode), Right(invalidKeys)) =>
val stage = ConsoleLoadStage(
plugin=this,
name=name,
Expand All @@ -352,8 +347,8 @@ class ConsoleLoad extends PipelineStagePlugin {
params=params
)

stage.stageDetail.put("inputView", stage.inputView)
stage.stageDetail.put("outputMode", stage.outputMode.sparkString)
stage.stageDetail.put("inputView", stage.inputView)
stage.stageDetail.put("outputMode", stage.outputMode.sparkString)
stage.stageDetail.put("params", params.asJava)

Right(stage)
Expand All @@ -369,10 +364,10 @@ class ConsoleLoad extends PipelineStagePlugin {

case class ConsoleLoadStage(
plugin: ConsoleLoad,
name: String,
description: Option[String],
inputView: String,
outputMode: OutputModeType,
name: String,
description: Option[String],
inputView: String,
outputMode: OutputModeType,
params: Map[String, String]
) extends PipelineStage {

Expand All @@ -386,13 +381,13 @@ object ConsoleLoadStage {

def execute(stage: ConsoleLoadStage)(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext): Option[DataFrame] = {

val df = spark.table(stage.inputView)
val df = spark.table(stage.inputView)

if (!df.isStreaming) {
throw new Exception("ConsoleLoad can only be executed in streaming mode.") with DetailException {
override val detail = stage.stageDetail
override val detail = stage.stageDetail
}
}
}

df.writeStream
.format("console")
Expand Down Expand Up @@ -431,7 +426,7 @@ To execute:
The inbuilt [Spark SQL Functions](https://spark.apache.org/docs/latest/api/sql/index.html) are heavily optimised by the internal Spark code to a level which custom User Defined Functions cannot be (byte code) - so where possible it is better to use the inbuilt functions.
{{</note>}}

`User Defined Functions` allow users to extend the [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) dialect.
`User Defined Functions` allow users to extend the [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) dialect.

Arc already includes [some addtional functions](partials/#user-defined-functions) which are not included in the base [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) dialect so any useful generic functions can be included in the [Arc repository](https://github.com/tripl-ai/arc) so that others can benefit.

Expand Down
59 changes: 27 additions & 32 deletions docs/plugins/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ <h3 id="examples-1">Examples</h3>
val invalidKeys = checkValidKeys(c)(expectedKeys)

(returnName, lagDays, leadDays, formatter, currentDate, invalidKeys) match {
case (Right(returnName), Right(lagDays), Right(leadDays), Right(formatter), Right(currentDate), Right(invalidKeys)) =&gt;
case (Right(returnName), Right(lagDays), Right(leadDays), Right(formatter), Right(currentDate), Right(invalidKeys)) =&gt;

val res = (lagDays * -1 to leadDays).map { v =&gt;
formatter.format(currentDate.plusDays(v))
Expand All @@ -547,8 +547,8 @@ <h3 id="examples-1">Examples</h3>
val err = StageError(index, this.getClass.getName, c.origin.lineNumber, allErrors)
Left(err :: Nil)
}
}
}

def parseFormatter(path: String)(formatter: String)(implicit c: Config): Either[Errors, DateTimeFormatter] = {
def err(lineNumber: Option[Int], msg: String): Either[Errors, DateTimeFormatter] = Left(ConfigError(path, lineNumber, msg) :: Nil)

Expand All @@ -557,7 +557,7 @@ <h3 id="examples-1">Examples</h3>
} catch {
case e: Exception =&gt; err(Some(c.getValue(path).origin.lineNumber()), e.getMessage)
}
}
}

def parseCurrentDate(path: String, formatter: DateTimeFormatter)(value: String)(implicit c: Config): Either[Errors, LocalDate] = {
def err(lineNumber: Option[Int], msg: String): Either[Errors, LocalDate] = Left(ConfigError(path, lineNumber, msg) :: Nil)
Expand All @@ -567,7 +567,7 @@ <h3 id="examples-1">Examples</h3>
} catch {
case e: Exception =&gt; err(Some(c.getValue(path).origin.lineNumber()), e.getMessage)
}
}
}
}

</code></pre>
Expand Down Expand Up @@ -619,14 +619,11 @@ <h3 id="examples-2">Examples</h3>

<pre><code class="language-scala">package ai.tripl.arc.plugins.lifecycle

import java.util

import org.apache.spark.sql.{DataFrame, SparkSession}

import ai.tripl.arc.api.API._
import ai.tripl.arc.plugins.LifecyclePlugin
import ai.tripl.arc.util.Utils
import ai.tripl.arc.util.log.logger.Logger
import ai.tripl.arc.config.Error._

class DataFramePrinter extends LifecyclePlugin {
Expand All @@ -644,7 +641,7 @@ <h3 id="examples-2">Examples</h3>
val invalidKeys = checkValidKeys(c)(expectedKeys)

(numRows, truncate, invalidKeys) match {
case (Right(numRows), Right(truncate), Right(invalidKeys)) =&gt;
case (Right(numRows), Right(truncate), Right(invalidKeys)) =&gt;
Right(DataFramePrinterInstance(
plugin=this,
numRows=numRows,
Expand All @@ -664,30 +661,28 @@ <h3 id="examples-2">Examples</h3>
truncate: Boolean
) extends LifecyclePluginInstance {

override def before(stage: PipelineStage)(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext) {
logger.trace()
override def before(stage: PipelineStage, index: Int, stages: List[PipelineStage])(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext) {
logger.trace()
.field(&quot;event&quot;, &quot;before&quot;)
.field(&quot;stage&quot;, stage.name)
.log()
.log()
}

override def after(stage: PipelineStage, result: Option[DataFrame], isLast: Boolean)(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext) {
logger.trace()
override def after(result: Option[DataFrame], stage: PipelineStage, index: Int, stages: List[PipelineStage])(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext) {
logger.trace()
.field(&quot;event&quot;, &quot;after&quot;)
.field(&quot;stage&quot;, stage.name)
.field(&quot;isLast&quot;, java.lang.Boolean.valueOf(isLast))
.log()
.log()

result match {
case Some(df) =&gt; df.show(numRows, truncate)
case None =&gt;
}
}
}
}

</code></pre>

<p>The plugin then needs to be registered by adding the full plugin name must be listed in your projects <code>/resources/META-INF/services/ai.tripl.arc.plugins.LifecyclePlugin</code> file.</p>
<p>The plugin then needs to be registered by adding the full plugin name must be listed in your project&rsquo;s <code>/resources/META-INF/services/ai.tripl.arc.plugins.LifecyclePlugin</code> file.</p>

<p>To execute:</p>

Expand Down Expand Up @@ -727,7 +722,7 @@ <h5 id="since-1-3-0-2">Since: 1.3.0</h5>
<p>Ensure that any stage with a <code>mapPartitions</code> or <code>map</code> DataFrame does not require the <code>PipelineStage</code> instance to be passed into the <code>map</code> function. So instead of doing something like:</p>

<pre><code class="language-scala">val transformedDF = try {
df.mapPartitions[TransformedRow] { partition: Iterator[Row] =&gt;
df.mapPartitions[TransformedRow] { partition: Iterator[Row] =&gt;
val uri = stage.uri.toString
</code></pre>

Expand All @@ -736,7 +731,7 @@ <h5 id="since-1-3-0-2">Since: 1.3.0</h5>
<pre><code class="language-scala">val stageUri = stage.uri

val transformedDF = try {
df.mapPartitions[TransformedRow] { partition: Iterator[Row] =&gt;
df.mapPartitions[TransformedRow] { partition: Iterator[Row] =&gt;
val uri = stageUri.toString
</code></pre>

Expand All @@ -757,10 +752,10 @@ <h3 id="examples-3">Examples</h3>
val inputView = getValue[String](&quot;inputView&quot;)
val outputMode = getValue[String](&quot;outputMode&quot;, default = Some(&quot;Append&quot;), validValues = &quot;Append&quot; :: &quot;Complete&quot; :: &quot;Update&quot; :: Nil) |&gt; parseOutputModeType(&quot;outputMode&quot;) _
val params = readMap(&quot;params&quot;, c)
val invalidKeys = checkValidKeys(c)(expectedKeys)
val invalidKeys = checkValidKeys(c)(expectedKeys)

(name, description, inputView, outputMode, invalidKeys) match {
case (Right(name), Right(description), Right(inputView), Right(outputMode), Right(invalidKeys)) =&gt;
case (Right(name), Right(description), Right(inputView), Right(outputMode), Right(invalidKeys)) =&gt;
val stage = ConsoleLoadStage(
plugin=this,
name=name,
Expand All @@ -770,8 +765,8 @@ <h3 id="examples-3">Examples</h3>
params=params
)

stage.stageDetail.put(&quot;inputView&quot;, stage.inputView)
stage.stageDetail.put(&quot;outputMode&quot;, stage.outputMode.sparkString)
stage.stageDetail.put(&quot;inputView&quot;, stage.inputView)
stage.stageDetail.put(&quot;outputMode&quot;, stage.outputMode.sparkString)
stage.stageDetail.put(&quot;params&quot;, params.asJava)

Right(stage)
Expand All @@ -787,10 +782,10 @@ <h3 id="examples-3">Examples</h3>

case class ConsoleLoadStage(
plugin: ConsoleLoad,
name: String,
description: Option[String],
inputView: String,
outputMode: OutputModeType,
name: String,
description: Option[String],
inputView: String,
outputMode: OutputModeType,
params: Map[String, String]
) extends PipelineStage {

Expand All @@ -804,13 +799,13 @@ <h3 id="examples-3">Examples</h3>

def execute(stage: ConsoleLoadStage)(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext): Option[DataFrame] = {

val df = spark.table(stage.inputView)
val df = spark.table(stage.inputView)

if (!df.isStreaming) {
throw new Exception(&quot;ConsoleLoad can only be executed in streaming mode.&quot;) with DetailException {
override val detail = stage.stageDetail
override val detail = stage.stageDetail
}
}
}

df.writeStream
.format(&quot;console&quot;)
Expand Down

0 comments on commit bb999d7

Please sign in to comment.