-
Notifications
You must be signed in to change notification settings - Fork 3
[WIP] Adds back-pressure based congestion handling and a Reactive Streams endpoint to Spark Streaming #13
Conversation
import java.lang.IllegalStateException | ||
|
||
@DeveloperApi | ||
abstract class ReactiveReceiver[T](storageLevel: StorageLevel) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code looks good, but since this is Developer API you should add some docs, especially on methods that are overridable (def when*
).
1b963c2
to
8baec36
Compare
nextBuffer: ArrayBuffer[Any]): Unit = { | ||
val bound = latestBound.get() | ||
val f = bound.toDouble / currentBuffer.size | ||
val samplees = currentBuffer.to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit worried about the number of copies we're making. According to scaladoc:
Converts this array buffer into another by copying all elements.
Also, it'd be good to be explicit about the destination type.
|
val bound = latestBound.get() | ||
val f = bound.toDouble / currentBuffer.size | ||
if (f > 0 && f < 1){ | ||
val samplees = currentBuffer.to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same concern about copying (and would like to use an explicit type, like .to[IndexedSeq[T]]
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'Same concern' << where was the other instance ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly at this point. Github doesn't show my old comment anymore, maybe due to some git commit reshuffling since 3 days ago.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here it is. It's collapsed because GitHub thinks it's outdated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK! So, the currentBuffer.toIterator
is lazy, so is BernouilliSampler(...).sample(...)
. So that, if I don't make a copy of the buffer once (in samplees
), by the time I call currentBuffer.clear()
, I've lost all elements in sampled
.
I refactored this part to make things clearer (and copy after sampling rather than before) in 92767f5 (the copy is made in the toArray
).
6d2913f
to
bc3de77
Compare
Could you please fix the style check failures?
|
bc3de77
to
54dab47
Compare
Refer to this link for build results (access rights to CI server needed): Build Log
Test FAILed. |
54dab47
to
1587e42
Compare
The bot is really smoking something, whether on 2.11 or 2.10. |
adds a basic Listener test: In two (close) listener configurations, the speed of processing should remain (roughly) constant.
Equipped CongestionStrategies with default Ignore
Bumping numbers: test is stable enough for repeated running when num of elements is > 10.
plugged them into Block Generation
…some platforms Those test don't react well to low-memory situations (very quick data generation).
1587e42
to
e38edfb
Compare
Refer to this link for build results (access rights to CI server needed): |
I've seen the SQL tests fail before, seems there's some flakiness in the code or our setup. Finally it passed (I disabled the 2.11 builds because the mima step in their build is 2.10 specific. I should get around to fix that at some point, but the priority is low...) |
} | ||
|
||
def getSpeedForStreamId(streamId: Int): Option[Long] = { | ||
streamIdToElemsPerBatch.flatMap(_.get(streamId)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What provides the happens-before guarantee on reading streamIdToElemsPerBatch
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see any threading guarantees, so if this code is guaranteed to run in a single thread, this is not a problem. Otherwise, it's not thread safe and may miss some updates, so a concurrent map or synchronization would be needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this code only runs on one thread then the synchronized
on line 97 in this file & changeset is misleading.
If it doesn't, then this class needs, as you say, a more cohesive thread safety story.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. After reading the code around it, it's definitely called from multiple threads (also, what that synchronized
call implies). Thanks for catching this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Glad I could help!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Crap ! Thanks, Viktor.
…into a single batch. SQL ``` select * from tableA join tableB on (a > 3 and b = d) or (a > 3 and b = e) ``` Plan before modify ``` == Optimized Logical Plan == Project [a#293,b#294,c#295,d#296,e#297] Join Inner, Some(((a#293 > 3) && ((b#294 = d#296) || (b#294 = e#297)))) MetastoreRelation default, tablea, None MetastoreRelation default, tableb, None ``` Plan after modify ``` == Optimized Logical Plan == Project [a#293,b#294,c#295,d#296,e#297] Join Inner, Some(((b#294 = d#296) || (b#294 = e#297))) Filter (a#293 > 3) MetastoreRelation default, tablea, None MetastoreRelation default, tableb, None ``` CombineLimits ==> Limit(If(LessThan(ne, le), ne, le), grandChild) and LessThan is in BooleanSimplification , so CombineLimits must before BooleanSimplification and BooleanSimplification must before PushPredicateThroughJoin. Author: Zhongshuai Pei <799203320@qq.com> Author: DoingDone9 <799203320@qq.com> Closes apache#6351 from DoingDone9/master and squashes the following commits: 20de7be [Zhongshuai Pei] Update Optimizer.scala 7bc7d28 [Zhongshuai Pei] Merge pull request #17 from apache/master 0ba5f42 [Zhongshuai Pei] Update Optimizer.scala f8b9314 [Zhongshuai Pei] Update FilterPushdownSuite.scala c529d9f [Zhongshuai Pei] Update FilterPushdownSuite.scala ae3af6d [Zhongshuai Pei] Update FilterPushdownSuite.scala a04ffae [Zhongshuai Pei] Update Optimizer.scala 11beb61 [Zhongshuai Pei] Update FilterPushdownSuite.scala f2ee5fe [Zhongshuai Pei] Update Optimizer.scala be6b1d5 [Zhongshuai Pei] Update Optimizer.scala b01e622 [Zhongshuai Pei] Merge pull request #15 from apache/master 8df716a [Zhongshuai Pei] Update FilterPushdownSuite.scala d98bc35 [Zhongshuai Pei] Update FilterPushdownSuite.scala fa65718 [Zhongshuai Pei] Update Optimizer.scala ab8e9a6 [Zhongshuai Pei] Merge pull request #14 from apache/master 14952e2 [Zhongshuai Pei] Merge pull request #13 from apache/master f03fe7f [Zhongshuai Pei] Merge pull request #12 from apache/master f12fa50 [Zhongshuai Pei] Merge pull request #10 from apache/master f61210c [Zhongshuai Pei] Merge pull request #9 from apache/master 34b1a9a [Zhongshuai Pei] Merge pull request #8 from apache/master 802261c [DoingDone9] Merge pull request #7 from apache/master d00303b [DoingDone9] Merge pull request #6 from apache/master 98b134f [DoingDone9] Merge pull request #5 from apache/master 161cae3 [DoingDone9] Merge pull request #4 from apache/master c87e8b6 [DoingDone9] Merge pull request #3 from apache/master cb1852d [DoingDone9] Merge pull request #2 from apache/master c3f046f [DoingDone9] Merge pull request #1 from apache/master
} | ||
} | ||
|
||
private case class CancelException(s: Subscription) extends SpecificationViolation { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Textual logs are harder to analyze with messages on multiple lines. Can we make this and the subsequent messages a single line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think they all are (there's a replaceAll("\n", "")
at the end of the line)
Follows and supersedes #11, #9.