diff --git a/docs-src/content/plugins/index.md b/docs-src/content/plugins/index.md index 63eba5dd..615acb49 100644 --- a/docs-src/content/plugins/index.md +++ b/docs-src/content/plugins/index.md @@ -19,7 +19,7 @@ Plugins are resolved dynamically at runtime and are resolved by name and version Assuming we wanted to execute a `KafkaExtract` [Pipeline Stage Plugin](#pipeline-stage-plugins): -{{< readfile file="/resources/docs_resources_plugins/KafkaExtractMin" highlight="json" >}} +{{< readfile file="/resources/docs_resources_plugins/KafkaExtractMin" highlight="json" >}} Arc will attempt to resolve the plugin by first looking in all the `META-INF` directories of all included `JAR` files (https://github.com/tripl-ai/arc-kafka-pipeline-plugin/blob/master/src/main/resources/META-INF/services/ai.tripl.arc.plugins.PipelineStagePlugin) for classes that extend `PipelineStagePlugin` which the `KafkaExtract` plugin does: @@ -27,7 +27,7 @@ Arc will attempt to resolve the plugin by first looking in all the `META-INF` di class KafkaExtract extends PipelineStagePlugin { ``` -Arc is then able to resolve the plugin by matching on `simpleName` - in this case `KafkaExtract` - and then call the `instantiate()` method to create an instance of the plugin which is executed by Arc at the appropriate time depending on plugin type. +Arc is then able to resolve the plugin by matching on `simpleName` - in this case `KafkaExtract` - and then call the `instantiate()` method to create an instance of the plugin which is executed by Arc at the appropriate time depending on plugin type. To allow more specitivity you can use either the full package name and/or include the version: @@ -107,7 +107,7 @@ class DeltaPeriodDynamicConfigurationPlugin extends DynamicConfigurationPlugin { val invalidKeys = checkValidKeys(c)(expectedKeys) (returnName, lagDays, leadDays, formatter, currentDate, invalidKeys) match { - case (Right(returnName), Right(lagDays), Right(leadDays), Right(formatter), Right(currentDate), Right(invalidKeys)) => + case (Right(returnName), Right(lagDays), Right(leadDays), Right(formatter), Right(currentDate), Right(invalidKeys)) => val res = (lagDays * -1 to leadDays).map { v => formatter.format(currentDate.plusDays(v)) @@ -122,8 +122,8 @@ class DeltaPeriodDynamicConfigurationPlugin extends DynamicConfigurationPlugin { val err = StageError(index, this.getClass.getName, c.origin.lineNumber, allErrors) Left(err :: Nil) } - } - + } + def parseFormatter(path: String)(formatter: String)(implicit c: Config): Either[Errors, DateTimeFormatter] = { def err(lineNumber: Option[Int], msg: String): Either[Errors, DateTimeFormatter] = Left(ConfigError(path, lineNumber, msg) :: Nil) @@ -132,7 +132,7 @@ class DeltaPeriodDynamicConfigurationPlugin extends DynamicConfigurationPlugin { } catch { case e: Exception => err(Some(c.getValue(path).origin.lineNumber()), e.getMessage) } - } + } def parseCurrentDate(path: String, formatter: DateTimeFormatter)(value: String)(implicit c: Config): Either[Errors, LocalDate] = { def err(lineNumber: Option[Int], msg: String): Either[Errors, LocalDate] = Left(ConfigError(path, lineNumber, msg) :: Nil) @@ -142,7 +142,7 @@ class DeltaPeriodDynamicConfigurationPlugin extends DynamicConfigurationPlugin { } catch { case e: Exception => err(Some(c.getValue(path).origin.lineNumber()), e.getMessage) } - } + } } ``` @@ -196,14 +196,11 @@ Custom `Lifecycle Plugins` allow users to extend the base Arc framework with log ```scala package ai.tripl.arc.plugins.lifecycle -import java.util - import org.apache.spark.sql.{DataFrame, SparkSession} import ai.tripl.arc.api.API._ import ai.tripl.arc.plugins.LifecyclePlugin import ai.tripl.arc.util.Utils -import ai.tripl.arc.util.log.logger.Logger import ai.tripl.arc.config.Error._ class DataFramePrinter extends LifecyclePlugin { @@ -221,7 +218,7 @@ class DataFramePrinter extends LifecyclePlugin { val invalidKeys = checkValidKeys(c)(expectedKeys) (numRows, truncate, invalidKeys) match { - case (Right(numRows), Right(truncate), Right(invalidKeys)) => + case (Right(numRows), Right(truncate), Right(invalidKeys)) => Right(DataFramePrinterInstance( plugin=this, numRows=numRows, @@ -241,30 +238,28 @@ case class DataFramePrinterInstance( truncate: Boolean ) extends LifecyclePluginInstance { - override def before(stage: PipelineStage)(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext) { - logger.trace() + override def before(stage: PipelineStage, index: Int, stages: List[PipelineStage])(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext) { + logger.trace() .field("event", "before") .field("stage", stage.name) - .log() + .log() } - override def after(stage: PipelineStage, result: Option[DataFrame], isLast: Boolean)(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext) { - logger.trace() + override def after(result: Option[DataFrame], stage: PipelineStage, index: Int, stages: List[PipelineStage])(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext) { + logger.trace() .field("event", "after") .field("stage", stage.name) - .field("isLast", java.lang.Boolean.valueOf(isLast)) - .log() + .log() result match { case Some(df) => df.show(numRows, truncate) case None => } - } + } } - ``` -The plugin then needs to be registered by adding the full plugin name must be listed in your project’s `/resources/META-INF/services/ai.tripl.arc.plugins.LifecyclePlugin` file. +The plugin then needs to be registered by adding the full plugin name must be listed in your project's `/resources/META-INF/services/ai.tripl.arc.plugins.LifecyclePlugin` file. To execute: @@ -307,7 +302,7 @@ Ensure that any stage with a `mapPartitions` or `map` DataFrame does not require ```scala val transformedDF = try { - df.mapPartitions[TransformedRow] { partition: Iterator[Row] => + df.mapPartitions[TransformedRow] { partition: Iterator[Row] => val uri = stage.uri.toString ``` @@ -317,7 +312,7 @@ Declare the variables outside the map function so that `stage` does not have to val stageUri = stage.uri val transformedDF = try { - df.mapPartitions[TransformedRow] { partition: Iterator[Row] => + df.mapPartitions[TransformedRow] { partition: Iterator[Row] => val uri = stageUri.toString ``` @@ -339,10 +334,10 @@ class ConsoleLoad extends PipelineStagePlugin { val inputView = getValue[String]("inputView") val outputMode = getValue[String]("outputMode", default = Some("Append"), validValues = "Append" :: "Complete" :: "Update" :: Nil) |> parseOutputModeType("outputMode") _ val params = readMap("params", c) - val invalidKeys = checkValidKeys(c)(expectedKeys) + val invalidKeys = checkValidKeys(c)(expectedKeys) (name, description, inputView, outputMode, invalidKeys) match { - case (Right(name), Right(description), Right(inputView), Right(outputMode), Right(invalidKeys)) => + case (Right(name), Right(description), Right(inputView), Right(outputMode), Right(invalidKeys)) => val stage = ConsoleLoadStage( plugin=this, name=name, @@ -352,8 +347,8 @@ class ConsoleLoad extends PipelineStagePlugin { params=params ) - stage.stageDetail.put("inputView", stage.inputView) - stage.stageDetail.put("outputMode", stage.outputMode.sparkString) + stage.stageDetail.put("inputView", stage.inputView) + stage.stageDetail.put("outputMode", stage.outputMode.sparkString) stage.stageDetail.put("params", params.asJava) Right(stage) @@ -369,10 +364,10 @@ class ConsoleLoad extends PipelineStagePlugin { case class ConsoleLoadStage( plugin: ConsoleLoad, - name: String, - description: Option[String], - inputView: String, - outputMode: OutputModeType, + name: String, + description: Option[String], + inputView: String, + outputMode: OutputModeType, params: Map[String, String] ) extends PipelineStage { @@ -386,13 +381,13 @@ object ConsoleLoadStage { def execute(stage: ConsoleLoadStage)(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext): Option[DataFrame] = { - val df = spark.table(stage.inputView) + val df = spark.table(stage.inputView) if (!df.isStreaming) { throw new Exception("ConsoleLoad can only be executed in streaming mode.") with DetailException { - override val detail = stage.stageDetail + override val detail = stage.stageDetail } - } + } df.writeStream .format("console") @@ -431,7 +426,7 @@ To execute: The inbuilt [Spark SQL Functions](https://spark.apache.org/docs/latest/api/sql/index.html) are heavily optimised by the internal Spark code to a level which custom User Defined Functions cannot be (byte code) - so where possible it is better to use the inbuilt functions. {{}} -`User Defined Functions` allow users to extend the [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) dialect. +`User Defined Functions` allow users to extend the [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) dialect. Arc already includes [some addtional functions](partials/#user-defined-functions) which are not included in the base [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) dialect so any useful generic functions can be included in the [Arc repository](https://github.com/tripl-ai/arc) so that others can benefit. diff --git a/docs/plugins/index.html b/docs/plugins/index.html index f91bd726..91bd110f 100644 --- a/docs/plugins/index.html +++ b/docs/plugins/index.html @@ -532,7 +532,7 @@

Examples

val invalidKeys = checkValidKeys(c)(expectedKeys) (returnName, lagDays, leadDays, formatter, currentDate, invalidKeys) match { - case (Right(returnName), Right(lagDays), Right(leadDays), Right(formatter), Right(currentDate), Right(invalidKeys)) => + case (Right(returnName), Right(lagDays), Right(leadDays), Right(formatter), Right(currentDate), Right(invalidKeys)) => val res = (lagDays * -1 to leadDays).map { v => formatter.format(currentDate.plusDays(v)) @@ -547,8 +547,8 @@

Examples

val err = StageError(index, this.getClass.getName, c.origin.lineNumber, allErrors) Left(err :: Nil) } - } - + } + def parseFormatter(path: String)(formatter: String)(implicit c: Config): Either[Errors, DateTimeFormatter] = { def err(lineNumber: Option[Int], msg: String): Either[Errors, DateTimeFormatter] = Left(ConfigError(path, lineNumber, msg) :: Nil) @@ -557,7 +557,7 @@

Examples

} catch { case e: Exception => err(Some(c.getValue(path).origin.lineNumber()), e.getMessage) } - } + } def parseCurrentDate(path: String, formatter: DateTimeFormatter)(value: String)(implicit c: Config): Either[Errors, LocalDate] = { def err(lineNumber: Option[Int], msg: String): Either[Errors, LocalDate] = Left(ConfigError(path, lineNumber, msg) :: Nil) @@ -567,7 +567,7 @@

Examples

} catch { case e: Exception => err(Some(c.getValue(path).origin.lineNumber()), e.getMessage) } - } + } } @@ -619,14 +619,11 @@

Examples

package ai.tripl.arc.plugins.lifecycle
 
-import java.util
-
 import org.apache.spark.sql.{DataFrame, SparkSession}
 
 import ai.tripl.arc.api.API._
 import ai.tripl.arc.plugins.LifecyclePlugin
 import ai.tripl.arc.util.Utils
-import ai.tripl.arc.util.log.logger.Logger
 import ai.tripl.arc.config.Error._
 
 class DataFramePrinter extends LifecyclePlugin {
@@ -644,7 +641,7 @@ 

Examples

val invalidKeys = checkValidKeys(c)(expectedKeys) (numRows, truncate, invalidKeys) match { - case (Right(numRows), Right(truncate), Right(invalidKeys)) => + case (Right(numRows), Right(truncate), Right(invalidKeys)) => Right(DataFramePrinterInstance( plugin=this, numRows=numRows, @@ -664,30 +661,28 @@

Examples

truncate: Boolean ) extends LifecyclePluginInstance { - override def before(stage: PipelineStage)(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext) { - logger.trace() + override def before(stage: PipelineStage, index: Int, stages: List[PipelineStage])(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext) { + logger.trace() .field("event", "before") .field("stage", stage.name) - .log() + .log() } - override def after(stage: PipelineStage, result: Option[DataFrame], isLast: Boolean)(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext) { - logger.trace() + override def after(result: Option[DataFrame], stage: PipelineStage, index: Int, stages: List[PipelineStage])(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext) { + logger.trace() .field("event", "after") .field("stage", stage.name) - .field("isLast", java.lang.Boolean.valueOf(isLast)) - .log() + .log() result match { case Some(df) => df.show(numRows, truncate) case None => } - } + } } -
-

The plugin then needs to be registered by adding the full plugin name must be listed in your project’s /resources/META-INF/services/ai.tripl.arc.plugins.LifecyclePlugin file.

+

The plugin then needs to be registered by adding the full plugin name must be listed in your project’s /resources/META-INF/services/ai.tripl.arc.plugins.LifecyclePlugin file.

To execute:

@@ -727,7 +722,7 @@
Since: 1.3.0

Ensure that any stage with a mapPartitions or map DataFrame does not require the PipelineStage instance to be passed into the map function. So instead of doing something like:

val transformedDF = try {
-  df.mapPartitions[TransformedRow] { partition: Iterator[Row] => 
+  df.mapPartitions[TransformedRow] { partition: Iterator[Row] =>
     val uri = stage.uri.toString
 
@@ -736,7 +731,7 @@
Since: 1.3.0
val stageUri = stage.uri
 
 val transformedDF = try {
-  df.mapPartitions[TransformedRow] { partition: Iterator[Row] => 
+  df.mapPartitions[TransformedRow] { partition: Iterator[Row] =>
     val uri = stageUri.toString
 
@@ -757,10 +752,10 @@

Examples

val inputView = getValue[String]("inputView") val outputMode = getValue[String]("outputMode", default = Some("Append"), validValues = "Append" :: "Complete" :: "Update" :: Nil) |> parseOutputModeType("outputMode") _ val params = readMap("params", c) - val invalidKeys = checkValidKeys(c)(expectedKeys) + val invalidKeys = checkValidKeys(c)(expectedKeys) (name, description, inputView, outputMode, invalidKeys) match { - case (Right(name), Right(description), Right(inputView), Right(outputMode), Right(invalidKeys)) => + case (Right(name), Right(description), Right(inputView), Right(outputMode), Right(invalidKeys)) => val stage = ConsoleLoadStage( plugin=this, name=name, @@ -770,8 +765,8 @@

Examples

params=params ) - stage.stageDetail.put("inputView", stage.inputView) - stage.stageDetail.put("outputMode", stage.outputMode.sparkString) + stage.stageDetail.put("inputView", stage.inputView) + stage.stageDetail.put("outputMode", stage.outputMode.sparkString) stage.stageDetail.put("params", params.asJava) Right(stage) @@ -787,10 +782,10 @@

Examples

case class ConsoleLoadStage( plugin: ConsoleLoad, - name: String, - description: Option[String], - inputView: String, - outputMode: OutputModeType, + name: String, + description: Option[String], + inputView: String, + outputMode: OutputModeType, params: Map[String, String] ) extends PipelineStage { @@ -804,13 +799,13 @@

Examples

def execute(stage: ConsoleLoadStage)(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext): Option[DataFrame] = { - val df = spark.table(stage.inputView) + val df = spark.table(stage.inputView) if (!df.isStreaming) { throw new Exception("ConsoleLoad can only be executed in streaming mode.") with DetailException { - override val detail = stage.stageDetail + override val detail = stage.stageDetail } - } + } df.writeStream .format("console")