Skip to content

[SPARK-12122][STREAMING] Prevent batches from being submitted twice after recovering StreamingContext from checkpoint #10127

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from

Conversation

tdas
Copy link
Contributor

@tdas tdas commented Dec 3, 2015

No description provided.

@tdas
Copy link
Contributor Author

tdas commented Dec 3, 2015

@zsxwing Please check this.
I think this problem has been caused by the #9707

@SparkQA
Copy link

SparkQA commented Dec 3, 2015

Test build #2161 has finished for PR 10127 at commit d904b25.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 3, 2015

Test build #47134 has finished for PR 10127 at commit d904b25.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 3, 2015

Test build #2162 has finished for PR 10127 at commit d904b25.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * class CrossValidator @Since(\"1.2.0\") (@Since(\"1.4.0\") override val uid: String)\n * class ParamGridBuilder @Since(\"1.2.0\")\n * class TrainValidationSplit @Since(\"1.5.0\") (@Since(\"1.5.0\") override val uid: String)\n

@@ -220,7 +220,8 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
logInfo("Batches pending processing (" + pendingTimes.size + " batches): " +
pendingTimes.mkString(", "))
// Reschedule jobs for these times
val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering)
val timesToReschedule = (pendingTimes ++ downTimes).filter { _ != restartTime }
.distinct.sorted(Time.ordering)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clarify why pendingTimes may contain restartTime?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explained offline:

The restart time is always checkpointTime+1 (assuming batch duration = 1). However, pending times can already have batches >= checkpointTime+1. This can cause timesToReschedule to have batches >= checkpointTime+1, which will be explicitly submitted, and then resubmitted through the timer.

@zsxwing
Copy link
Member

zsxwing commented Dec 3, 2015

LGTM

@SparkQA
Copy link

SparkQA commented Dec 3, 2015

Test build #2164 has finished for PR 10127 at commit d904b25.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 3, 2015

Test build #2165 has finished for PR 10127 at commit d904b25.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 3, 2015

Test build #47163 has finished for PR 10127 at commit fe69fbf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

asfgit pushed a commit that referenced this pull request Dec 4, 2015
…fter recovering StreamingContext from checkpoint

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #10127 from tdas/SPARK-12122.

(cherry picked from commit 4106d80)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
@asfgit asfgit closed this in 4106d80 Dec 4, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants