-
Notifications
You must be signed in to change notification settings - Fork 266
IdentityKeyedProducer in ValueFlatMapToFlatMap Dag Optimizer rule #580
Conversation
@@ -291,7 +291,7 @@ trait DagOptimizer[P <: Platform[P]] { | |||
// TODO: we need to case class here to not lose the irreducible which may be named | |||
case ValueFlatMappedProducer(in, fn) => | |||
// we know that (K, V) <: T due to the case match, but scala can't see it | |||
def cast[K, V](p: Prod[(K, V)]): Prod[T] = p.asInstanceOf[Prod[T]] | |||
def cast[K, V](p: Prod[(K, V)]): Prod[T] = IdentityKeyedProducer[P, K, V](p).asInstanceOf[Prod[T]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is line 283 above going to have the same issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still not understanding why this is an error. Can you post a gist to a stack trace that hit the issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's the stack trace from the test if we don't wrap in IdentityKeyedProducer:
[error] ! StormPlatform matches Scala for left join with flatMapValues jobs
[error] ClassCastException: com.twitter.summingbird.FlatMappedProducer cannot be cast to com.twitter.summingbird.KeyedProducer (StripNamedNodes.scala:124)
[error] com.twitter.summingbird.planner.StripNamedNode$.castToKeyed(StripNamedNodes.scala:29)
[error] com.twitter.summingbird.planner.StripNamedNode$$anonfun$functionize$12.apply(StripNamedNodes.scala:124)
[error] com.twitter.summingbird.planner.StripNamedNode$$anonfun$functionize$12.apply(StripNamedNodes.scala:124)
[error] com.twitter.summingbird.planner.StripNamedNode$$anonfun$processLevel$1.apply(StripNamedNodes.scala:38)
[error] com.twitter.summingbird.planner.StripNamedNode$$anonfun$processLevel$1.apply(StripNamedNodes.scala:35)
[error] com.twitter.summingbird.planner.StripNamedNode$.processLevel(StripNamedNodes.scala:35)
[error] com.twitter.summingbird.planner.StripNamedNode$$anonfun$mutateGraph$2.apply(StripNamedNodes.scala:149)
[error] com.twitter.summingbird.planner.StripNamedNode$$anonfun$mutateGraph$2.apply(StripNamedNodes.scala:147)
[error] com.twitter.summingbird.planner.StripNamedNode$.mutateGraph(StripNamedNodes.scala:147)
[error] com.twitter.summingbird.planner.StripNamedNode$.stripNamedNodes(StripNamedNodes.scala:156)
[error] com.twitter.summingbird.planner.StripNamedNode$.apply(StripNamedNodes.scala:168)
[error] com.twitter.summingbird.planner.OnlinePlan$.apply(OnlinePlan.scala:223)
[error] com.twitter.summingbird.storm.Storm.plan(StormPlatform.scala:332)
[error] com.twitter.summingbird.storm.StormTestRun$.apply(StormTestRun.scala:78)
[error] com.twitter.summingbird.storm.StormTestRun$.simpleRun(StormTestRun.scala:98)
[error] com.twitter.summingbird.storm.StormLaws$$anonfun$6.apply(StormLaws.scala:232)
[error] com.twitter.summingbird.storm.StormLaws$$anonfun$6.apply(StormLaws.scala:227)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay. This code was actually the predecessor of the Dag optimizer code. I bet we could remove it now and replace the remove name nodes dag rule. I feel like that might be a cleaner solution since this code is actually making a false assumption (the strip name nodes code above).
I'm not sure we should be casting to KeyedProducer in planning. I tried to make KeyedProducer unneeded and only used to add methods to Producer. |
We can revisit the re-write later, but in order to get Summingbird released I think we need this as is for now. |
IdentityKeyedProducer in ValueFlatMapToFlatMap Dag Optimizer rule
My point is that this is being fixed at the wrong place. The cast should be removed in the original code, not in the Dag code. For instance, I think just casting to |
We were downcasting ValueFlatMappedProducer to FlatMappedProducer in the ValueFlatMapToFlatMap DagOptimizer rule thus causing ClassCastExceptions when trying to cast to a KeyedProducer during planning. This wraps the cast in an IdentityKeyedProducer to preserve the fact that this is a KeyedProducer. Adds a test as well.