Skip to content

[SPARK-8127][Streaming][Kafka] KafkaRDD optimize count() take() isEmpty() #6632

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from

Conversation

koeninger
Copy link
Contributor

…ed KafkaRDD methods. Possible fix for [SPARK-7122], but probably a worthwhile optimization regardless.

…ed KafkaRDD methods. Possible fix for [SPARK-7122], but probably a worthwhile optimization regardless.
@srowen
Copy link
Member

srowen commented Jun 4, 2015

Jenkins, add to whitelist

@srowen
Copy link
Member

srowen commented Jun 4, 2015

ok to test

@srowen
Copy link
Member

srowen commented Jun 4, 2015

At a glance this makes sense to me. Let's see what tests say.

@SparkQA
Copy link

SparkQA commented Jun 4, 2015

Test build #34181 has finished for PR 6632 at commit c3768c5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -60,6 +62,49 @@ class KafkaRDD[
}.toArray
}

override def count(): Long = offsetRanges.map(_.count).sum
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about it, and I have a little concern about this. What if someone create a KafkaRDD with wrong offset ranges, which does not exist. In the current state, count will fail which is the correct thing to do. However, with this patch, it will give a count which is technically incorrect, rather than fail. May be a good idea to validate the limits of the offset ranges by actually querying Kafka, to verify that they exist before returning the count. And do it just once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now checking offset ranges in the createRdd method

@tdas
Copy link
Contributor

tdas commented Jun 4, 2015

Can you make a JIRA for this and add it to the title of the PR (see other PR's formatting). And title of the JIRA and PR could be a little more obvious - KafkaRDD optimizations for count() and take()

@tdas
Copy link
Contributor

tdas commented Jun 4, 2015

Other than that this looks very promising.

override def isEmpty(): Boolean = count == 0L

override def take(num: Int): Array[R] = {
val nonEmpty = this.partitions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nonEmptyPartitions

@koeninger koeninger changed the title [Streaming][Kafka] Take advantage of offset range info for size-relat… [Streaming][Kafka][SPARK-8127] KafkaRDD optimize count() take() isEmpty() Jun 5, 2015
@SparkQA
Copy link

SparkQA commented Jun 5, 2015

Test build #34323 has finished for PR 6632 at commit 8974b9e.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 6, 2015

Test build #34348 has finished for PR 6632 at commit 253031d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@koeninger
Copy link
Contributor Author

@tdas is there anything else you feel needs to be done on this?

kc: KafkaCluster,
offsetRanges: Array[OffsetRange]): Unit = {
val topics = offsetRanges.map(_.topicAndPartition).toSet
val badRanges = KafkaCluster.checkErrors(for {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@koeninger I know this is probably a good scala way, but this is kinda hard to read for the nesting. Could you take a the for-yield and put it in a separate variable? And then check for errors?

@tdas
Copy link
Contributor

tdas commented Jun 19, 2015

I think its almost good to go. Few minor points.

// size-related method optimizations return sane results
assert(rdd.count === messages.size)
assert(rdd.countApprox(0).getFinalValue.mean === messages.size)
assert(! rdd.isEmpty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra space

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is not check whether isEmpty is successful.

@SparkQA
Copy link

SparkQA commented Jun 19, 2015

Test build #35310 has finished for PR 6632 at commit f68bd32.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • // class ParentClass(parentField: Int)
    • // class ChildClass(childField: Int) extends ParentClass(1)
    • // If the class type corresponding to current slot has writeObject() defined,
    • // then its not obvious which fields of the class will be serialized as the writeObject()
    • abstract class GeneratedClass
    • case class Bin(child: Expression)
    • case class Md5(child: Expression)

assert(rdd.countApprox(0).getFinalValue.mean === messages.size)
assert(! rdd.isEmpty)
assert(rdd.take(1).size === 1)
assert(messages(rdd.take(1).head._2))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this check? Shouldnt it check that rdd.take(1) === "the" // whatever is expected

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's asserting that item taken from the rdd is a member of the set of
messages sent

On Fri, Jun 19, 2015 at 4:07 PM, Tathagata Das notifications@github.com
wrote:

In
external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
#6632 (comment):

@@ -68,6 +68,21 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {

 val received = rdd.map(_._2).collect.toSet
 assert(received === messages)
  • // size-related method optimizations return sane results
  • assert(rdd.count === messages.size)
  • assert(rdd.countApprox(0).getFinalValue.mean === messages.size)
  • assert(! rdd.isEmpty)
  • assert(rdd.take(1).size === 1)
  • assert(messages(rdd.take(1).head._2))

What does this check? Shouldnt it check that rdd.take(1) === "the" //
whatever is expected


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/6632/files#r32869380.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldnt the test be stronger that it return the expected message from the right offset and not just any of the messages? Basically if there is a bug in the code where take(1) returns the last message in the offset range rather than the first message, it wont be caught.

@tdas
Copy link
Contributor

tdas commented Jun 19, 2015

Just a couple of more comments on the tests.

@SparkQA
Copy link

SparkQA commented Jun 19, 2015

Test build #35329 has finished for PR 6632 at commit 5a05d0f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • // class ParentClass(parentField: Int)
    • // class ChildClass(childField: Int) extends ParentClass(1)
    • // If the class type corresponding to current slot has writeObject() defined,
    • // then its not obvious which fields of the class will be serialized as the writeObject()
    • abstract class GeneratedClass
    • case class Bin(child: Expression)
    • case class Md5(child: Expression)

@SparkQA
Copy link

SparkQA commented Jun 20, 2015

Test build #35331 has finished for PR 6632 at commit 321340d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • // class ParentClass(parentField: Int)
    • // class ChildClass(childField: Int) extends ParentClass(1)
    • // If the class type corresponding to current slot has writeObject() defined,
    • // then its not obvious which fields of the class will be serialized as the writeObject()
    • abstract class GeneratedClass
    • case class Bin(child: Expression)
    • case class Md5(child: Expression)

@tdas
Copy link
Contributor

tdas commented Jun 20, 2015

Merging this to master, thanks a lot.

@tdas
Copy link
Contributor

tdas commented Jun 20, 2015

Wait, oh, the title, please fix order :/

@koeninger koeninger changed the title [Streaming][Kafka][SPARK-8127] KafkaRDD optimize count() take() isEmpty() [SPARK-8127][Streaming][Kafka] KafkaRDD optimize count() take() isEmpty() Jun 20, 2015
@koeninger
Copy link
Contributor Author

fixed title

@tdas
Copy link
Contributor

tdas commented Jun 20, 2015

Merging to master.

@asfgit asfgit closed this in 1b6fe9b Jun 20, 2015
@tdas
Copy link
Contributor

tdas commented Jun 24, 2015

I forgot to say, thanks Cody! :)

@koeninger
Copy link
Contributor Author

Cheers :)

On Wed, Jun 24, 2015 at 2:06 PM, Tathagata Das notifications@github.com
wrote:

I forgot to say, thanks Cody! :)


Reply to this email directly or view it on GitHub
#6632 (comment).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants