-
Notifications
You must be signed in to change notification settings - Fork 28.5k
[SPARK-8127][Streaming][Kafka] KafkaRDD optimize count() take() isEmpty() #6632
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ed KafkaRDD methods. Possible fix for [SPARK-7122], but probably a worthwhile optimization regardless.
Jenkins, add to whitelist |
ok to test |
At a glance this makes sense to me. Let's see what tests say. |
Test build #34181 has finished for PR 6632 at commit
|
@@ -60,6 +62,49 @@ class KafkaRDD[ | |||
}.toArray | |||
} | |||
|
|||
override def count(): Long = offsetRanges.map(_.count).sum |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about it, and I have a little concern about this. What if someone create a KafkaRDD with wrong offset ranges, which does not exist. In the current state, count will fail which is the correct thing to do. However, with this patch, it will give a count which is technically incorrect, rather than fail. May be a good idea to validate the limits of the offset ranges by actually querying Kafka, to verify that they exist before returning the count. And do it just once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now checking offset ranges in the createRdd method
Can you make a JIRA for this and add it to the title of the PR (see other PR's formatting). And title of the JIRA and PR could be a little more obvious - KafkaRDD optimizations for count() and take() |
Other than that this looks very promising. |
override def isEmpty(): Boolean = count == 0L | ||
|
||
override def take(num: Int): Array[R] = { | ||
val nonEmpty = this.partitions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nonEmptyPartitions
Test build #34323 has finished for PR 6632 at commit
|
Test build #34348 has finished for PR 6632 at commit
|
@tdas is there anything else you feel needs to be done on this? |
kc: KafkaCluster, | ||
offsetRanges: Array[OffsetRange]): Unit = { | ||
val topics = offsetRanges.map(_.topicAndPartition).toSet | ||
val badRanges = KafkaCluster.checkErrors(for { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@koeninger I know this is probably a good scala way, but this is kinda hard to read for the nesting. Could you take a the for-yield and put it in a separate variable? And then check for errors?
I think its almost good to go. Few minor points. |
// size-related method optimizations return sane results | ||
assert(rdd.count === messages.size) | ||
assert(rdd.countApprox(0).getFinalValue.mean === messages.size) | ||
assert(! rdd.isEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: extra space
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is not check whether isEmpty is successful.
Test build #35310 has finished for PR 6632 at commit
|
assert(rdd.countApprox(0).getFinalValue.mean === messages.size) | ||
assert(! rdd.isEmpty) | ||
assert(rdd.take(1).size === 1) | ||
assert(messages(rdd.take(1).head._2)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this check? Shouldnt it check that rdd.take(1) === "the" // whatever is expected
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's asserting that item taken from the rdd is a member of the set of
messages sent
On Fri, Jun 19, 2015 at 4:07 PM, Tathagata Das notifications@github.com
wrote:
In
external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
#6632 (comment):@@ -68,6 +68,21 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
val received = rdd.map(_._2).collect.toSet assert(received === messages)
- // size-related method optimizations return sane results
- assert(rdd.count === messages.size)
- assert(rdd.countApprox(0).getFinalValue.mean === messages.size)
- assert(! rdd.isEmpty)
- assert(rdd.take(1).size === 1)
- assert(messages(rdd.take(1).head._2))
What does this check? Shouldnt it check that rdd.take(1) === "the" //
whatever is expected—
Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/6632/files#r32869380.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldnt the test be stronger that it return the expected message from the right offset and not just any of the messages? Basically if there is a bug in the code where take(1) returns the last message in the offset range rather than the first message, it wont be caught.
Just a couple of more comments on the tests. |
Test build #35329 has finished for PR 6632 at commit
|
Test build #35331 has finished for PR 6632 at commit
|
Merging this to master, thanks a lot. |
Wait, oh, the title, please fix order :/ |
fixed title |
Merging to master. |
I forgot to say, thanks Cody! :) |
Cheers :) On Wed, Jun 24, 2015 at 2:06 PM, Tathagata Das notifications@github.com
|
…ed KafkaRDD methods. Possible fix for [SPARK-7122], but probably a worthwhile optimization regardless.