Skip to content
This repository was archived by the owner on Feb 20, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 37 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
# Pipelines Machine Learning Examples

This project contains three example pipelines:
This project contains several example pipelines:

1. Judge the quality of wine using models that are served within a streamlet process.
2. Make product recommendations using models that are served _as a service_ using Kubeflow.
3. Predict air traffic delays using an H2O embedded "MOJO" model.
4. Wine quality canary deployment, allowing to specify percentage of traffic send to each model serving implementation
5. Speculative wine quality deployment, allowing servicing several models in parallel and picking result based on individual results.

In addition, it contains prototypes for reusable "contrib" libraries for Pipelines:

Expand Down Expand Up @@ -137,13 +139,15 @@ Then you can use created service to access it

If you run any of the following commands in the "root" project (`pipelines-model-serving`), you'll get errors about multiple blueprint files being disallowed by Pipelines.

So, decide which of the three projects you want to build and deploy, then change to that project in `sbt` and run `buildAndPublish`.
So, decide which of the five projects you want to build and deploy, then change to that project in `sbt` and run `buildAndPublish`.

Specifically, from the `sbt` prompt, do _one_ of the following first:

1. Wine quality: `project wineModelServingPipeline` (corresponding to the directory `wine-quality-ml`)
2. Airline flights: `project airlineFlightsModelServingPipeline` (corresponding to the directory `airline-flights-ml`)
3. Recommender: `project recommenderModelServingPipeline` (corresponding to the directory `recommender-ml`)
4. Canary deployment: `project wineModelServingBlueGreenPipeline` (corresponding to the directory `wine-quality-ml_bluegreen`)
5. Speculative service deployment: `project wineModelServingSpeculativePipeline` (corresponding to the directory `wine-quality-ml_speculative`)

Now build the project:

Expand All @@ -158,19 +162,44 @@ The image name will be based on one of the following strings, where `USER` will
* Wine app: `wine-quality-ml-USER`
* Airline app: `airline-flights-ml-USER`
* Recommender app: `recommender-ml-USER`
* Canary deployment app: `wine-quality-bluegreen-ml-USER`
* Speculative serving app: `wine-quality-speculative-ml-USER`

Note that current implementations are leveraging persistence based on files. This means that
prior to deploying an application, a PVC for usage by this application has to be created. This PVC has to be created
in the namespace, where application is deployed (which corresponds to an application name) and should support RWX access (use glusterfs-storage class on OpenShift).
This can be done either directly creating PVC using OpenShift console or leveraging the following yaml file:
````
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: persistence-data-mount // Choose other name
spec:
storageClassName: glusterfs-storage
accessModes:
- "ReadWriteMany"
resources:
requests:
storage: "10Gi"
````

The full image identifier is printed as part of the output of the `buildAndPublish` command. It includes the Docker registry URL for your cluster and the auto-generated tag for the image. Copy and past that text for the deployment command next, replacing the placeholder `IMAGE` shown with the text. Note: this command uses `kubectl`, so it is run on a separate shell window:

The full image identifier is printed as part of the output of the `buildAndPublish` command. It includes the Docker registry URL for your cluster and the auto-generated tag for the image. Copy and past that text for the deployment command next, replacing the placeholder `IMAGE` shown with the text. Note: this command uses `kubectl`, so it is run on a separate shell window:
```shell
kubectl pipelines deploy IMAGE
kubectl pipelines deploy IMAGE --volume-mount model-serving.persistence-data-mount=persistence-data-mount
```
> NOTE volume mount here need to be defined for every streamlet that is using this persistence. For example, canary deployment example contains 3 streamlets using persistence. Consequently deployment command looks like following
````
kubectl pipelines deploy -u $(oc whoami) -p $(oc whoami -t) docker-registry-default.fiorano.lightbend.com/lightbend/wine-quality-bluegreen-ml-boris:193-9cb5dfe --volume-mount model-serving1.persistence-data-mount=persistence-data-mount --volume-mount model-serving2.persistence-data-mount=persistence-data-mount --volume-mount winedata-splitter.persistence-data-mount=persistence-data-mount
````
>Here '-u $(oc whoami) -p $(oc whoami -t)' is optional and ensures that login to the registry is correct.

> NOTE: If you are on OpenShift and prefer the `oc` command, replace `kubectl` with `oc plugin`.

For the airline and wine apps, you can also override InfluxDB parameters on the command line (or any other configuration parameters, really). For the wine app, it would look as follows, where any or all of the configuration flags could be given. Here, the default values are shown on the right hand sides of the equal signs:

```shell
kubectl pipelines deploy IMAGE \
kubectl pipelines deploy IMAGE --volume-mount model-serving.persistence-data-mount=persistence-data-mount \
wine-quality.influxdb.host="influxdb.influxdb.svc" \
wine-quality.influxdb.port="8086" \
wine-quality.influxdb.database="wine_ml"
Expand All @@ -179,7 +208,7 @@ kubectl pipelines deploy IMAGE \
Similarly, for the airline app:

```shell
kubectl pipelines deploy IMAGE \
kubectl pipelines deploy IMAGE --volume-mount model-serving.persistence-data-mount=persistence-data-mount \
airline-flights.influxdb.host="influxdb.influxdb.svc" \
airline-flights.influxdb.port="8086" \
airline-flights.influxdb.database="airline_ml"
Expand All @@ -205,6 +234,8 @@ avroSpecificSourceDirectories in Compile ++=
```

So, when that project's `*.avsc` files are parsed, the shared files in `model-serving` will also be parsed, _again_, and the output code will be compiled into that project's jar file. This means that when the app is deployed, there will be _two_ copies of the class files for these shared classes. This is "safe", because the classes are identical, but not very "clean". Hence, a future version of this code will need to eliminate this duplication.
Additional issue with this is that Avro generator might add `unused imports` corresponding to the included
Avro schemas. That is why the project disables `-Xfatal-warnings`

### Ingress with "Canned" Data

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ airline-flights : {

// If you use the InfluxDB Egresses:
influxdb : {
host : "influxdb.influxdb.svc",
host : "influxdb-influxdb.influxdb.svc.cluster.local",
port : 8086,
database : "airline_ml"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import pipelinesx.config.ConfigUtil.implicits._

import com.lightbend.modelserving.model.{ ModelDescriptor, ModelType }
import com.lightbend.modelserving.model.ModelDescriptorUtil.implicits._
import com.lightbend.modelserving.model.util.ModelMainBase

/**
* Ingress of model updates. In this case, every two minutes we load and
Expand All @@ -29,7 +28,7 @@ final case object AirlineFlightModelIngress extends AkkaStreamlet {

override def createLogic = new RunnableGraphStreamletLogic() {
def runnableGraph =
AirlineFlightModelIngressUtil.makeSource().to(atMostOnceSink(out))
AirlineFlightModelIngressUtil.makeSource().to(plainSink(out))
}
}

Expand All @@ -43,8 +42,7 @@ protected final class ModelDescriptorProvider() {
val buffer = new Array[Byte](1024)
val content = new ByteArrayOutputStream()
Stream.continually(is.read(buffer)).takeWhile(_ != -1).foreach(content.write(buffer, 0, _))
val mojo = content.toByteArray
mojo
content.toByteArray
}

var count = -1
Expand Down Expand Up @@ -81,22 +79,3 @@ object AirlineFlightModelIngressUtil {
.throttle(1, frequency)
}
}

/**
* Test program for [[AirlineFlightModelIngress]] and [[AirlineFlightModelIngressUtil]].
* It reads models and prints their data. For testing purposes only.
* At this time, Pipelines intercepts calls to sbt run and sbt runMain, so use
* the console instead:
* ```
* import pipelines.examples.modelserving.airlineflights._
* AirlineFlightModelIngressMain.main(Array("-n","20","-f","1000"))
* ```
*/
object AirlineFlightModelIngressMain extends ModelMainBase(
defaultCount = 20,
defaultFrequencyMillis = AirlineFlightModelIngressUtil.modelFrequencySeconds * 1000) {

override protected def makeSource(frequency: FiniteDuration): Source[ModelDescriptor, NotUsed] =
AirlineFlightModelIngressUtil.makeSource(frequency)
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package pipelines.examples.modelserving.airlineflights

import models.AirlineFlightH2OModelFactory
import com.lightbend.modelserving.model.actor.ModelServingActor
import com.lightbend.modelserving.model.{ Model, ModelDescriptor }
import com.lightbend.modelserving.model.h2o.H2OModel
import akka.Done
import akka.actor.{ ActorRef, ActorSystem }
import akka.pattern.ask
import akka.util.Timeout
import com.lightbend.modelserving.model.persistence.FilePersistence

import scala.concurrent.duration._
import pipelines.akkastream.AkkaStreamlet
import pipelines.akkastream.scaladsl.{ FlowWithOffsetContext, RunnableGraphStreamletLogic }
import pipelines.streamlets.{ ReadWriteMany, StreamletShape, VolumeMount }
import pipelines.streamlets.avro.{ AvroInlet, AvroOutlet }
import hex.genmodel.easy.prediction.BinomialModelPrediction
import pipelines.examples.modelserving.airlineflights.data.{ AirlineFlightRecord, AirlineFlightResult }
import pipelines.examples.modelserving.airlineflights.result.ModelLabelProbabilityResult

final case object AirlineFlightModelServer extends AkkaStreamlet {

val in0 = AvroInlet[AirlineFlightRecord]("in-0")
val in1 = AvroInlet[ModelDescriptor]("in-1")
val out = AvroOutlet[AirlineFlightResult]("out", _.inputRecord.uniqueCarrier)
final override val shape = StreamletShape.withInlets(in0, in1).withOutlets(out)

// Declare the volume mount:

private val persistentDataMount = VolumeMount("persistence-data-mount", "/data", ReadWriteMany)
override def volumeMounts = Vector(persistentDataMount)

implicit val askTimeout: Timeout = Timeout(30.seconds)

/** Uses the actor system as an argument to support testing outside of the streamlet. */
def makeModelServer(sys: ActorSystem): ActorRef = {

sys.actorOf(
ModelServingActor.props[AirlineFlightRecord, BinomialModelPrediction](
"airlines", AirlineFlightH2OModelFactory, () ⇒ new BinomialModelPrediction))
}

override final def createLogic = new RunnableGraphStreamletLogic() {
// Set persistence
FilePersistence.setGlobalMountPoint(getMountedPath(persistentDataMount).toString)
FilePersistence.setStreamletName(streamletRef)

def runnableGraph() = {
sourceWithOffsetContext(in1).via(modelFlow).runWith(sinkWithOffsetContext)
sourceWithOffsetContext(in0).via(dataFlow).to(sinkWithOffsetContext(out))
}

val modelServer = makeModelServer(context.system)

protected def dataFlow =
FlowWithOffsetContext[AirlineFlightRecord].mapAsync(1) { record ⇒
modelServer.ask(record).mapTo[Model.ModelReturn[BinomialModelPrediction]]
.map { modelReturn ⇒
val bmp: BinomialModelPrediction = modelReturn.modelOutput
val (label, probability) = H2OModel.fromPrediction(bmp)
AirlineFlightResult(
modelResult = ModelLabelProbabilityResult(label, probability),
modelResultMetadata = modelReturn.modelResultMetadata,
inputRecord = record)
}
}

protected def modelFlow =
FlowWithOffsetContext[ModelDescriptor]
.mapAsync(1) { descriptor ⇒ modelServer.ask(descriptor).mapTo[Done] }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import pipelinesx.ingress.RecordsReader
import pipelinesx.config.ConfigUtil
import pipelinesx.config.ConfigUtil.implicits._
import scala.concurrent.duration._
import com.lightbend.modelserving.model.util.MainBase
import pipelines.examples.modelserving.airlineflights.data.AirlineFlightRecord

/**
Expand All @@ -25,7 +24,7 @@ final case object AirlineFlightRecordIngress extends AkkaStreamlet {

override final def createLogic = new RunnableGraphStreamletLogic {
def runnableGraph =
AirlineFlightRecordIngressUtil.makeSource().to(atMostOnceSink(out))
AirlineFlightRecordIngressUtil.makeSource().to(plainSink(out))
}
}

Expand Down Expand Up @@ -97,22 +96,3 @@ object AirlineFlightRecordIngressUtil {
}
}
}

/**
* Test program for [[AirlineFlightRecordIngress]] and [[AirlineFlightRecordIngressUtil]];
* reads records and prints them. For testing purposes only.
* At this time, Pipelines intercepts calls to sbt run and sbt runMain, so use
* the console instead:
* ```
* import pipelines.examples.modelserving.airlineflights._
* AirlineFlightRecordIngressMain.main(Array("-n","10","-f","1000"))
* ```
*/
object AirlineFlightRecordIngressMain extends MainBase[AirlineFlightRecord](
defaultCount = 10,
defaultFrequencyMillis = AirlineFlightRecordIngressUtil.dataFrequencyMilliseconds) {

override protected def makeSource(frequency: FiniteDuration): Source[AirlineFlightRecord, NotUsed] =
AirlineFlightRecordIngressUtil.makeSource(
AirlineFlightRecordIngressUtil.rootConfigKey, frequency)
}
Loading