-
Couldn't load subscription status.
- Fork 4
Bluegreen #7
base: master
Are you sure you want to change the base?
Bluegreen #7
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is too much that needs changing, minor things (camel-case names) to larger questions about what should be configurable and how best to configurable (percentage splitting logic). So, I'm not going to merge this branch until after the Pipelines team reviews the current master branch.
| * Actor that handles messages to update a splitting policy and to split input according to the percentages. | ||
| * @param label used for identifying the app, e.g., as part of a file name for persistence of the split policy. | ||
| */ | ||
| class DataSplittingActor( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this done with an actor when FlowOps.groupBy would be sufficient? https://doc.akka.io/api/akka/current/akka/stream/scaladsl/Source.html#groupBy[K](maxSubstreams:Int,f:Out=%3EK):akka.stream.scaladsl.SubFlow[Out,Mat,FlowOps.this.Repr,FlowOps.this.Closed]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following the same paradigm that I used before. I am using actors everywhere when there is a stateful execution. In this case its configuration
| override def receive: PartialFunction[Any, Unit] = { | ||
|
|
||
| // Process a new merger definition | ||
| case speculativeStreamMerger: SpeculativeStreamMerger ⇒ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is SpeculativeStreamMerger defined with Avro and all the overhead that goes with managing it as "state"? Isn't all you really need just configuration properties that are loaded at startup?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To support dynamic configuration?
|
|
||
| protected val filePersistence = FilePersistence.apply(null) | ||
|
|
||
| protected var currentTransformer: Option[StreamSplitter] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It appears that StreamSplitter is streamed in. That adds a lot of code to support runtime configurability and hence I'm not so sure it's worth it. Isn't reading the values at the beginning from the configuration sufficient?
What happens if I don't update the values in weeks? Does Akka Streams have a timeout that assumes the stream died? If I really wanted the percentages to be configurable, wouldn't it be simpler to have a web service in the streamlet where I can send commands to update the values?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea was to be able to configure everything to avoid restarting
| val probs = prediction.classProbabilities | ||
| val probability = if (probs.length == 2) probs(1) else 0.0 | ||
| (prediction.label, probability) | ||
| try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how an exception can be thrown in this case, but if BinomialModelPrediction can throw one here, what does that mean? Should we log an error or something rather than silently return a default value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If prediction is null, then it barfs. I did not trace if fr enough to see how this can happen, but did see it.
|
|
||
| /** | ||
| * Save the state to a file system. | ||
| * Restore the state of a splitter from a file system. Use [[stateExists]] first to determine |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FilePersistence is meant to be opaque to model details. Please don't add this logic here.
|
|
||
| protected def startFlow = | ||
| FlowWithPipelinesContext[StartSpeculative].mapAsync(1) { | ||
| descriptor ⇒ splieeterCollector.ask(descriptor).mapTo[Done] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spelling
|
|
||
| def makeSource(frequency: FiniteDuration = 5.millisecond): Source[Long, NotUsed] = | ||
| Source.repeat(1L) | ||
| .throttle(1, frequency) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Magic numbers...
| @@ -0,0 +1,68 @@ | |||
| package pipelines.examples.modelserving.speculative | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw the same code above? Copy-paste error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are in different modules, so yes
| override def decideResult(results: List[WineResult]): WineResult = { | ||
|
|
||
| results.size match { | ||
| case 0 ⇒ // No results, can only happen if we timed out |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is why your Decider trait must return an Either[String,...], with the string holding an error message. I don't care if this example code here, the Decider is intended for the library.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, whats wrong with this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are returning a bogus record. If I wrote a production application with this logic, would it be acceptable to return a bogus, made-up record and also not log an error? I don't think it would be acceptable. That's why this function needs to return Either and the error case needs to handled higher in the stack.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait, decider is doing the right thing. If there is no results, the only thing that it can do is to return an error that everything timed out, which it does. If all results are errors, this error is returned, and only if we have at least one result, then it will calculate result
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no higher level thing - the decider is the master
| import pipelines.examples.modelserving.winequality.data.WineResult | ||
| import pipelines.examples.modelserving.winequality.speculative.WineResultRun | ||
|
|
||
| class WineSpeculativeRecordSplitter extends SpeculativeRecordSplitter[WineResult] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I really don't understand what this *Splitter abstraction is for. This code seems to be boilerplate that doesn't do anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, we do need to split the record into UUID and base
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its a generic implementation that splits record with UUID into UUID and record
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why introduce a whole class to do such a trivial operation? It's not even very type safe (the source argument).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This goes back to to inheritance in Avro. What is a generic way to split a record before knowing its type. Dealing with Avro is a bitch
|
@deanwampler @blublinsky I suggest big PRS (if not broken down to smaller ones) to have: a) design doc b) test coverage c) proper docs for the novice user like seldon has (https://docs.seldon.io/projects/seldon-core/en/latest/examples/istio.html) e) proper description Going through seldon and other stuff... I am curious why we re-invent the wheel at the application layer while people try to solve this at the K8s layer? kubeflow/kubeflow#667 |
Extension adding canary testing and speculative model serving