Skip to content

Commit

Permalink
lifecycleplugin api change
Browse files Browse the repository at this point in the history
  • Loading branch information
seddonm1 committed Sep 19, 2019
1 parent e746838 commit 573f7b3
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 29 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
## Change Log

# 2.1.0

- add missing types `BooleanList`, `Double`, `DoubleList`, `LongList` to config reader.

**BREAKING**
- change API for `LifecyclePlugin` to pass the stage index and the full job pipeline so that the current and other stages can be accessed in the plugin.

# 2.0.1

- update to [Spark 2.4.4](https://spark.apache.org/releases/spark-release-2-4-4.html).
Expand Down
34 changes: 21 additions & 13 deletions src/main/scala/ai/tripl/arc/ARC.scala
Original file line number Diff line number Diff line change
Expand Up @@ -404,38 +404,46 @@ object ARC {
def run(pipeline: ETLPipeline)
(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext): Option[DataFrame] = {

def before(stage: PipelineStage): Unit = {
def before(index: Int, stages: List[PipelineStage]): Unit = {
for (p <- arcContext.activeLifecyclePlugins) {
logger.trace().message(s"Executing before() on LifecyclePlugin: ${p.getClass.getName}")
p.before(stage)
p.before(index, stages)
}
}

def after(stage: PipelineStage, result: Option[DataFrame], isLast: Boolean): Unit = {
def after(currentValue: Option[DataFrame], index: Int, stages: List[PipelineStage]): Unit = {
for (p <- arcContext.activeLifecyclePlugins) {
logger.trace().message(s"Executing after(last = $isLast) on LifecyclePlugin: ${p.getClass.getName}")
p.after(stage, result, isLast)
logger.trace().message(s"Executing after on LifecyclePlugin: ${stages(index).getClass.getName}")
p.after(currentValue, index, stages)
}
}

@tailrec
def runStages(stages: List[PipelineStage]): Option[DataFrame] = {
def runStages(stages: List[(PipelineStage, Int)]): Option[DataFrame] = {
stages match {
case Nil => None // end
case head :: Nil =>
before(head)
val result = processStage(head)
after(head, result, true)
val stage = head._1
val index = head._2
val pipelineStages = stages.map(_._1)
before(index, pipelineStages)
val result = processStage(stage)
after(result, index, pipelineStages)
result

//currentValue[, index[, array]]
case head :: tail =>
before(head)
val result = processStage(head)
after(head, result, false)
val stage = head._1
val index = head._2
val pipelineStages = stages.map(_._1)
before(index, pipelineStages)
val result = processStage(stage)
after(result, index, pipelineStages)
runStages(tail)
}
}

runStages(pipeline.stages)
runStages(pipeline.stages.zipWithIndex)
}

def processStage(stage: PipelineStage)(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext): Option[DataFrame] = {
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/ai/tripl/arc/api/API.scala
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,9 @@ object API {

def plugin: LifecyclePlugin

def before(stage: PipelineStage)(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext)
def before(index: Int, stages: List[PipelineStage])(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext)

def after(stage: PipelineStage, result: Option[DataFrame], isLast: Boolean)(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext)
def after(currentValue: Option[DataFrame], index: Int, stages: List[PipelineStage])(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext)

}

Expand Down
40 changes: 36 additions & 4 deletions src/main/scala/ai/tripl/arc/config/ConfigReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ object ConfigReader {

}

implicit object IntConfigReader extends ConfigReader[Int] {

val expectedType = "int"

def read(path: String, c: Config): Int = c.getInt(path)

}

implicit object IntListConfigReader extends ConfigReader[IntList] {

val expectedType = "integer array"
Expand All @@ -109,13 +117,13 @@ object ConfigReader {

}

implicit object IntConfigReader extends ConfigReader[Int] {
implicit object BooleanListConfigReader extends ConfigReader[BooleanList] {

val expectedType = "int"
val expectedType = "boolean array"

def read(path: String, c: Config): Int = c.getInt(path)
def read(path: String, c: Config): BooleanList = c.getBooleanList(path).asScala.map(f => f.booleanValue).toList

}
}

implicit object LongConfigReader extends ConfigReader[Long] {

Expand All @@ -125,6 +133,30 @@ object ConfigReader {

}

implicit object LongListConfigReader extends ConfigReader[LongList] {

val expectedType = "long array"

def read(path: String, c: Config): LongList = c.getLongList(path).asScala.map(f => f.toLong).toList

}

implicit object DoubleConfigReader extends ConfigReader[Double] {

val expectedType = "double"

def read(path: String, c: Config): Double = c.getDouble(path)

}

implicit object DoubleListConfigReader extends ConfigReader[DoubleList] {

val expectedType = "double array"

def read(path: String, c: Config): DoubleList = c.getDoubleList(path).asScala.map(f => f.toDouble).toList

}

def getValue[A](path: String, default: Option[A] = None, validValues: Seq[A] = Seq.empty)(implicit c: Config, reader: ConfigReader[A]): Either[Errors, A] = {
reader.getValue(path, c, default, validValues)
}
Expand Down
8 changes: 7 additions & 1 deletion src/main/scala/ai/tripl/arc/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@ package ai.tripl.arc

package object config {

type StringList = List[String]
type DoubleList = List[Double]

type BooleanList = List[Boolean]

type IntList = List[Int]

type LongList = List[Long]

type StringList = List[String]

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,22 @@ case class DataFramePrinterInstance(
truncate: Boolean
) extends LifecyclePluginInstance {

override def before(stage: PipelineStage)(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext) {
override def before(index: Int, stages: List[PipelineStage])(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext) {
val stage = stages(index)
logger.trace()
.field("event", "before")
.field("stage", stage.name)
.log()
}

override def after(stage: PipelineStage, result: Option[DataFrame], isLast: Boolean)(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext) {
override def after(currentValue: Option[DataFrame], index: Int, stages: List[PipelineStage])(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext) {
val stage = stages(index)
logger.trace()
.field("event", "after")
.field("stage", stage.name)
.field("isLast", java.lang.Boolean.valueOf(isLast))
.log()

result match {
currentValue match {
case Some(df) => df.show(numRows, truncate)
case None =>
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class LifecyclePluginSuite extends FunSuite with BeforeAndAfter {
val expectedBefore = Seq(("delimited extract", "before", "testValue")).toDF("stage","when","message")
assert(TestUtils.datasetEquality(expectedBefore, spark.table("before")))

val expectedAfter = Seq(("delimited extract", "after", "testValue", 1L, true)).toDF("stage","when","message","count","isLast")
val expectedAfter = Seq(("delimited extract", "after", "testValue", 1L)).toDF("stage","when","message","count")
assert(TestUtils.datasetEquality(expectedAfter, spark.table("after")))
}

Expand Down
8 changes: 5 additions & 3 deletions src/test/scala/ai/tripl/arc/plugins/TestLifecyclePlugin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,17 @@ case class TestLifecyclePluginInstance(
key: String
) extends LifecyclePluginInstance {

override def before(stage: PipelineStage)(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext) {
override def before(index: Int, stages: List[PipelineStage])(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext) {
import spark.implicits._
val stage= stages(index)
val df = Seq((stage.name, "before", this.key)).toDF("stage","when","message")
df.createOrReplaceTempView("before")
}

override def after(stage: PipelineStage, result: Option[DataFrame], isLast: Boolean)(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext) {
override def after(currentValue: Option[DataFrame], index: Int, stages: List[PipelineStage])(implicit spark: SparkSession, logger: ai.tripl.arc.util.log.logger.Logger, arcContext: ARCContext) {
import spark.implicits._
val df = Seq((stage.name, "after", this.key, result.get.count, isLast)).toDF("stage","when","message","count","isLast")
val stage= stages(index)
val df = Seq((stage.name, "after", this.key, currentValue.get.count)).toDF("stage","when","message","count")
df.createOrReplaceTempView("after")
}
}
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version := "2.0.1"
version := "2.1.0"

0 comments on commit 573f7b3

Please sign in to comment.