Skip to content

[SPARK-17813][SQL][KAFKA] Maximum data per trigger #15527

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 13 commits into from

Conversation

koeninger
Copy link
Contributor

@koeninger koeninger commented Oct 18, 2016

What changes were proposed in this pull request?

maxOffsetsPerTrigger option for rate limiting, proportionally based on volume of different topicpartitions.

How was this patch tested?

Added unit test

@SparkQA
Copy link

SparkQA commented Oct 18, 2016

Test build #67113 has finished for PR 15527 at commit 6c8d459.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 24, 2016

Test build #67455 has finished for PR 15527 at commit 76ecafc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 24, 2016

Test build #67459 has finished for PR 15527 at commit fde4e33.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your work. The major issue is currentPartitionOffsets should be set when recovering from failure.

@@ -153,11 +201,7 @@ private[kafka010] case class KafkaSource(

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currentPartitionOffsets should be set to untilPartitionOffsets if it's empty. It means recovery from a failure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be set to the highest available offset in the streaming metadata log, not the highest available offset in kafka?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currentPartitionOffsets is the last processed offsets. Right? When recovering from the failure, getBatch will be called firstly, then getOffset.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I see what you were saying, thought you were talking about resetting it in getOffset, not the end passed to getBatch.

StartStream(ProcessingTime(100), clock),
AdvanceManualClock(100),
// 1 from smallest, 1 from middle, 8 from biggest
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a race condition here. The batch may be still running. I figured out the following codes to cover the recovery and fix the race condition finally.

  test("maxOffsetsPerTrigger") {
    val topic = newTopic()
    testUtils.createTopic(topic, partitions = 3)
    testUtils.sendMessages(topic, (100 to 200).map(_.toString).toArray, Some(0))
    testUtils.sendMessages(topic, (10 to 20).map(_.toString).toArray, Some(1))
    testUtils.sendMessages(topic, Array("1"), Some(2))

    val reader = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
      .option("kafka.metadata.max.age.ms", "1")
      .option("maxOffsetsPerTrigger", 10)
      .option("subscribe", topic)
      .option("startingOffsets", "earliest")
    val kafka = reader.load()
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .as[(String, String)]
    val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)

    val clock = new StreamManualClock

    val waitUntilBatchProcessed = AssertOnQuery { q =>
      eventually(Timeout(streamingTimeout)) {
        if (!q.exception.isDefined) {
          assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
        }
      }
      if (q.exception.isDefined) {
        throw q.exception.get
      }
      true
    }

    testStream(mapped)(
      StartStream(ProcessingTime(100), clock),
      waitUntilBatchProcessed,
      // 1 from smallest, 1 from middle, 8 from biggest
      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107),
      AdvanceManualClock(100),
      waitUntilBatchProcessed,
      // smallest now empty, 1 more from middle, 9 more from biggest
      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
        11, 108, 109, 110, 111, 112, 113, 114, 115, 116
      ),
      StopStream,
      StartStream(ProcessingTime(100), clock),
      waitUntilBatchProcessed,
      AdvanceManualClock(100),
      waitUntilBatchProcessed,
      // smallest now empty, 1 more from middle, 9 more from biggest
      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
        11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
        12, 117, 118, 119, 120, 121, 122, 123, 124, 125
      ),
      AdvanceManualClock(100),
      waitUntilBatchProcessed,
      // smallest now empty, 1 more from middle, 9 more from biggest
      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
        11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
        12, 117, 118, 119, 120, 121, 122, 123, 124, 125,
        13, 126, 127, 128, 129, 130, 131, 132, 133, 134
      )
    )
  }

This test fails now because of an issue being fixed in #14553.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, #14553 got merged.

@SparkQA
Copy link

SparkQA commented Oct 27, 2016

Test build #67616 has finished for PR 15527 at commit 6a7ff24.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except some style nits.

until: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
val fromNew = fetchNewPartitionEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
val sizes = until.flatMap { case (tp, end) =>
// If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use 2 spaces

until
} else {
until.map { case (tp, end) =>
tp -> sizes.get(tp).map { size =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use 2 spaces

@SparkQA
Copy link

SparkQA commented Oct 27, 2016

Test build #67645 has finished for PR 15527 at commit 5e4b468.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Oct 27, 2016

LGTM. Thanks! Merging to master and 2.0.

asfgit pushed a commit that referenced this pull request Oct 27, 2016
## What changes were proposed in this pull request?

maxOffsetsPerTrigger option for rate limiting, proportionally based on volume of different topicpartitions.

## How was this patch tested?

Added unit test

Author: cody koeninger <cody@koeninger.org>

Closes #15527 from koeninger/SPARK-17813.

(cherry picked from commit 1042325)
Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
@asfgit asfgit closed this in 1042325 Oct 27, 2016
robert3005 pushed a commit to palantir/spark that referenced this pull request Nov 1, 2016
## What changes were proposed in this pull request?

maxOffsetsPerTrigger option for rate limiting, proportionally based on volume of different topicpartitions.

## How was this patch tested?

Added unit test

Author: cody koeninger <cody@koeninger.org>

Closes apache#15527 from koeninger/SPARK-17813.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
## What changes were proposed in this pull request?

maxOffsetsPerTrigger option for rate limiting, proportionally based on volume of different topicpartitions.

## How was this patch tested?

Added unit test

Author: cody koeninger <cody@koeninger.org>

Closes apache#15527 from koeninger/SPARK-17813.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants