Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

[WIP] Adds back-pressure based congestion handling and a Reactive Streams endpoint to Spark Streaming #13

Conversation

huitseeker
Copy link

Follows and supersedes #11, #9.

Review on Reviewable

import java.lang.IllegalStateException

@DeveloperApi
abstract class ReactiveReceiver[T](storageLevel: StorageLevel)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code looks good, but since this is Developer API you should add some docs, especially on methods that are overridable (def when*).

@huitseeker huitseeker force-pushed the ReactiveStreamingBackPressureControl branch from 1b963c2 to 8baec36 Compare June 12, 2015 12:25
nextBuffer: ArrayBuffer[Any]): Unit = {
val bound = latestBound.get()
val f = bound.toDouble / currentBuffer.size
val samplees = currentBuffer.to
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit worried about the number of copies we're making. According to scaladoc:

Converts this array buffer into another by copying all elements.

Also, it'd be good to be explicit about the destination type.

@huitseeker
Copy link
Author

Note to self : log dropping rate in destructive strategies.

val bound = latestBound.get()
val f = bound.toDouble / currentBuffer.size
if (f > 0 && f < 1){
val samplees = currentBuffer.to
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same concern about copying (and would like to use an explicit type, like .to[IndexedSeq[T]]).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'Same concern' << where was the other instance ?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly at this point. Github doesn't show my old comment anymore, maybe due to some git commit reshuffling since 3 days ago.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here it is. It's collapsed because GitHub thinks it's outdated.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK! So, the currentBuffer.toIterator is lazy, so is BernouilliSampler(...).sample(...). So that, if I don't make a copy of the buffer once (in samplees), by the time I call currentBuffer.clear(), I've lost all elements in sampled.

I refactored this part to make things clearer (and copy after sampling rather than before) in 92767f5 (the copy is made in the toArray).

@dragos
Copy link

dragos commented Jun 17, 2015

Could you please fix the style check failures?

Scalastyle checks failed at following occurrences:
[error] /home/ubuntu/workspace/ghprb-spark-multi-conf/label/Spark-Ora-JDK7-PV/scala_version/2.10/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala:233: File line length exceeds 100 characters
[error] /home/ubuntu/workspace/ghprb-spark-multi-conf/label/Spark-Ora-JDK7-PV/scala_version/2.10/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala:235: File line length exceeds 100 characters
[error] /home/ubuntu/workspace/ghprb-spark-multi-conf/label/Spark-Ora-JDK7-PV/scala_version/2.10/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala:233:27: No space after token :
[error] (streaming/test:scalastyle) errors exist
[error] Total time: 11 s, completed Jun 17, 2015 5:03:22 AM
[error] /home/ubuntu/workspace/ghprb-spark-multi-conf/label/Spark-Ora-JDK7-PV/scala_version/2.10/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala:233: File line length exceeds 100 characters
[error] /home/ubuntu/workspace/ghprb-spark-multi-conf/label/Spark-Ora-JDK7-PV/scala_version/2.10/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala:235: File line length exceeds 100 characters
[error] /home/ubuntu/workspace/ghprb-spark-multi-conf/label/Spark-Ora-JDK7-PV/scala_version/2.10/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala:233:27: No space after token :
[error] (streaming/test:scalastyle) errors exist

@huitseeker huitseeker changed the title Adds back-pressure based congestion handling and a Reactive Streams endpoint to Spark Streaming [WIP] Adds back-pressure based congestion handling and a Reactive Streams endpoint to Spark Streaming Jun 17, 2015
@huitseeker huitseeker force-pushed the ReactiveStreamingBackPressureControl branch from bc3de77 to 54dab47 Compare June 17, 2015 22:12
@typesafe-tools
Copy link
Collaborator

Refer to this link for build results (access rights to CI server needed):
https://ci.typesafe.com/job/ghprb-spark-multi-conf/30/

Build Log
last 10 lines

[...truncated 20 lines...]
First time build. Skipping changelog.
Triggering ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10
Triggering ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.11
Configuration ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10 is still in the queue: Waiting for next available executor on Spark JDK-7 PV (i-bdc6216e)
Configuration ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10 is still in the queue: Waiting for next available executor on Spark-Ora-JDK7-PV
ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10 completed with result FAILURE
ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.11 completed with result FAILURE
Notifying upstream projects of job completion
Setting status of 54dab47ff422fbdcbe24796702f49bf103d909df to FAILURE with url http://ci.typesafe.com/job/ghprb-spark-multi-conf/30/ and message: Merged build finished.

Test FAILed.

@huitseeker huitseeker force-pushed the ReactiveStreamingBackPressureControl branch from 54dab47 to 1587e42 Compare June 18, 2015 13:29
@huitseeker
Copy link
Author

The bot is really smoking something, whether on 2.11 or 2.10.

@huitseeker huitseeker force-pushed the ReactiveStreamingBackPressureControl branch from 1587e42 to e38edfb Compare June 18, 2015 19:31
@typesafe-tools
Copy link
Collaborator

Refer to this link for build results (access rights to CI server needed):
https://ci.typesafe.com/job/ghprb-spark-multi-conf/32/
Test PASSed.

@dragos
Copy link

dragos commented Jun 19, 2015

I've seen the SQL tests fail before, seems there's some flakiness in the code or our setup. Finally it passed (I disabled the 2.11 builds because the mima step in their build is 2.10 specific. I should get around to fix that at some point, but the priority is low...)

}

def getSpeedForStreamId(streamId: Int): Option[Long] = {
streamIdToElemsPerBatch.flatMap(_.get(streamId))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What provides the happens-before guarantee on reading streamIdToElemsPerBatch?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any threading guarantees, so if this code is guaranteed to run in a single thread, this is not a problem. Otherwise, it's not thread safe and may miss some updates, so a concurrent map or synchronization would be needed.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this code only runs on one thread then the synchronized on line 97 in this file & changeset is misleading.
If it doesn't, then this class needs, as you say, a more cohesive thread safety story.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. After reading the code around it, it's definitely called from multiple threads (also, what that synchronized call implies). Thanks for catching this.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Glad I could help!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Crap ! Thanks, Viktor.

dragos pushed a commit that referenced this pull request Jul 2, 2015
…into a single batch.

SQL
```
select * from tableA join tableB on (a > 3 and b = d) or (a > 3 and b = e)
```
Plan before modify
```
== Optimized Logical Plan ==
Project [a#293,b#294,c#295,d#296,e#297]
 Join Inner, Some(((a#293 > 3) && ((b#294 = d#296) || (b#294 = e#297))))
  MetastoreRelation default, tablea, None
  MetastoreRelation default, tableb, None
```
Plan after modify
```
== Optimized Logical Plan ==
Project [a#293,b#294,c#295,d#296,e#297]
 Join Inner, Some(((b#294 = d#296) || (b#294 = e#297)))
  Filter (a#293 > 3)
   MetastoreRelation default, tablea, None
  MetastoreRelation default, tableb, None
```

CombineLimits ==> Limit(If(LessThan(ne, le), ne, le), grandChild) and LessThan is in BooleanSimplification ,  so CombineLimits  must before BooleanSimplification and BooleanSimplification must before PushPredicateThroughJoin.

Author: Zhongshuai Pei <799203320@qq.com>
Author: DoingDone9 <799203320@qq.com>

Closes apache#6351 from DoingDone9/master and squashes the following commits:

20de7be [Zhongshuai Pei] Update Optimizer.scala
7bc7d28 [Zhongshuai Pei] Merge pull request #17 from apache/master
0ba5f42 [Zhongshuai Pei] Update Optimizer.scala
f8b9314 [Zhongshuai Pei] Update FilterPushdownSuite.scala
c529d9f [Zhongshuai Pei] Update FilterPushdownSuite.scala
ae3af6d [Zhongshuai Pei] Update FilterPushdownSuite.scala
a04ffae [Zhongshuai Pei] Update Optimizer.scala
11beb61 [Zhongshuai Pei] Update FilterPushdownSuite.scala
f2ee5fe [Zhongshuai Pei] Update Optimizer.scala
be6b1d5 [Zhongshuai Pei] Update Optimizer.scala
b01e622 [Zhongshuai Pei] Merge pull request #15 from apache/master
8df716a [Zhongshuai Pei] Update FilterPushdownSuite.scala
d98bc35 [Zhongshuai Pei] Update FilterPushdownSuite.scala
fa65718 [Zhongshuai Pei] Update Optimizer.scala
ab8e9a6 [Zhongshuai Pei] Merge pull request #14 from apache/master
14952e2 [Zhongshuai Pei] Merge pull request #13 from apache/master
f03fe7f [Zhongshuai Pei] Merge pull request #12 from apache/master
f12fa50 [Zhongshuai Pei] Merge pull request #10 from apache/master
f61210c [Zhongshuai Pei] Merge pull request #9 from apache/master
34b1a9a [Zhongshuai Pei] Merge pull request #8 from apache/master
802261c [DoingDone9] Merge pull request #7 from apache/master
d00303b [DoingDone9] Merge pull request #6 from apache/master
98b134f [DoingDone9] Merge pull request #5 from apache/master
161cae3 [DoingDone9] Merge pull request #4 from apache/master
c87e8b6 [DoingDone9] Merge pull request #3 from apache/master
cb1852d [DoingDone9] Merge pull request #2 from apache/master
c3f046f [DoingDone9] Merge pull request #1 from apache/master
}
}

private case class CancelException(s: Subscription) extends SpecificationViolation {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Textual logs are harder to analyze with messages on multiple lines. Can we make this and the subsequent messages a single line?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they all are (there's a replaceAll("\n", "") at the end of the line)

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants