Skip to content
This repository was archived by the owner on Feb 20, 2023. It is now read-only.

Conversation

@blublinsky
Copy link

Extension adding canary testing and speculative model serving

Copy link
Contributor

@deanwampler deanwampler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is too much that needs changing, minor things (camel-case names) to larger questions about what should be configurable and how best to configurable (percentage splitting logic). So, I'm not going to merge this branch until after the Pipelines team reviews the current master branch.

* Actor that handles messages to update a splitting policy and to split input according to the percentages.
* @param label used for identifying the app, e.g., as part of a file name for persistence of the split policy.
*/
class DataSplittingActor(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following the same paradigm that I used before. I am using actors everywhere when there is a stateful execution. In this case its configuration

override def receive: PartialFunction[Any, Unit] = {

// Process a new merger definition
case speculativeStreamMerger: SpeculativeStreamMerger
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is SpeculativeStreamMerger defined with Avro and all the overhead that goes with managing it as "state"? Isn't all you really need just configuration properties that are loaded at startup?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To support dynamic configuration?


protected val filePersistence = FilePersistence.apply(null)

protected var currentTransformer: Option[StreamSplitter] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears that StreamSplitter is streamed in. That adds a lot of code to support runtime configurability and hence I'm not so sure it's worth it. Isn't reading the values at the beginning from the configuration sufficient?

What happens if I don't update the values in weeks? Does Akka Streams have a timeout that assumes the stream died? If I really wanted the percentages to be configurable, wouldn't it be simpler to have a web service in the streamlet where I can send commands to update the values?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was to be able to configure everything to avoid restarting

val probs = prediction.classProbabilities
val probability = if (probs.length == 2) probs(1) else 0.0
(prediction.label, probability)
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how an exception can be thrown in this case, but if BinomialModelPrediction can throw one here, what does that mean? Should we log an error or something rather than silently return a default value?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If prediction is null, then it barfs. I did not trace if fr enough to see how this can happen, but did see it.


/**
* Save the state to a file system.
* Restore the state of a splitter from a file system. Use [[stateExists]] first to determine
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FilePersistence is meant to be opaque to model details. Please don't add this logic here.


protected def startFlow =
FlowWithPipelinesContext[StartSpeculative].mapAsync(1) {
descriptor splieeterCollector.ask(descriptor).mapTo[Done]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spelling


def makeSource(frequency: FiniteDuration = 5.millisecond): Source[Long, NotUsed] =
Source.repeat(1L)
.throttle(1, frequency)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Magic numbers...

@@ -0,0 +1,68 @@
package pipelines.examples.modelserving.speculative
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw the same code above? Copy-paste error?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are in different modules, so yes

override def decideResult(results: List[WineResult]): WineResult = {

results.size match {
case 0 // No results, can only happen if we timed out
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is why your Decider trait must return an Either[String,...], with the string holding an error message. I don't care if this example code here, the Decider is intended for the library.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, whats wrong with this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are returning a bogus record. If I wrote a production application with this logic, would it be acceptable to return a bogus, made-up record and also not log an error? I don't think it would be acceptable. That's why this function needs to return Either and the error case needs to handled higher in the stack.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, decider is doing the right thing. If there is no results, the only thing that it can do is to return an error that everything timed out, which it does. If all results are errors, this error is returned, and only if we have at least one result, then it will calculate result

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no higher level thing - the decider is the master

import pipelines.examples.modelserving.winequality.data.WineResult
import pipelines.examples.modelserving.winequality.speculative.WineResultRun

class WineSpeculativeRecordSplitter extends SpeculativeRecordSplitter[WineResult] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I really don't understand what this *Splitter abstraction is for. This code seems to be boilerplate that doesn't do anything.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, we do need to split the record into UUID and base

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its a generic implementation that splits record with UUID into UUID and record

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why introduce a whole class to do such a trivial operation? It's not even very type safe (the source argument).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This goes back to to inheritance in Avro. What is a generic way to split a record before knowing its type. Dealing with Avro is a bitch

@skonto
Copy link

skonto commented Oct 3, 2019

@deanwampler @blublinsky I suggest big PRS (if not broken down to smaller ones) to have: a) design doc b) test coverage c) proper docs for the novice user like seldon has (https://docs.seldon.io/projects/seldon-core/en/latest/examples/istio.html) e) proper description

Going through seldon and other stuff... I am curious why we re-invent the wheel at the application layer while people try to solve this at the K8s layer? kubeflow/kubeflow#667

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants