Skip to content

[SPARK-23816][CORE] Killed tasks should ignore FetchFailures. #20987

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from

Conversation

squito
Copy link
Contributor

@squito squito commented Apr 5, 2018

SPARK-19276 ensured that FetchFailures do not get swallowed by other
layers of exception handling, but it also meant that a killed task could
look like a fetch failure. This is particularly a problem with
speculative execution, where we expect to kill tasks as they are reading
shuffle data. The fix is to ensure that we always check for killed
tasks first.

Added a new unit test which fails before the fix, ran it 1k times to
check for flakiness. Full suite of tests on jenkins.

SPARK-19276 ensured that FetchFailures do not get swallowed by other
layers of exception handling, but it also meant that a killed task could
look like a fetch failure.  This is particularly a problem with
speculative execution, where we expect to kill tasks as they are reading
shuffle data.  The fix is to ensure that we always check for killed
tasks first.

Added a new unit test which fails before the fix, ran it 1k times to
check for flakiness.  Full suite of tests on jenkins.
@SparkQA
Copy link

SparkQA commented Apr 6, 2018

Test build #88958 has finished for PR 20987 at commit d886ba3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor Author

squito commented Apr 6, 2018

pinging some potential reviewers: @tgravescs @kayousterhout @zsxwing @mridulm

} else if (interrupt) {
// make sure our test is setup correctly
assert(TaskContext.get().asInstanceOf[TaskContextImpl].fetchFailed.isDefined)
// signal our test is ready for the task to get killed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, signal killingThread in our test, since killingThread is truly waiting on latch1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer the original comment -- the mechanics of what is waiting on the latch are easy enough to follow, its more important to explain why.

assert(failReason.isInstanceOf[TaskKilled])
}

def testFetchFailureHandling(oom: Boolean): (TaskFailedReason, UncaughtExceptionHandler) = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can give a comment about oom, something like 'when oom = true, handle OOM case, otherwise, handle interrupt case`, since this method got a little complicated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah good point, this is pretty confusing now, I pushed an update with more comments

@Ngone51
Copy link
Member

Ngone51 commented Apr 6, 2018

Things I'm concerned about is that does there exists a situation like 'a task gets killed after it gets a FetchFailure, but re-run later and gets a FetchFailure too without TaskKilledException' (or this fix against speculative tasks only).

Of course, we can handle the FetchFailure during the re-run process. But, if we can handle the FetchFailure earlier when we get those two Exceptions in order in the same task run process, it would be better. And then, we can still handle TaskKilledException.

But this may request us to judge whether this task will re-run.

setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(
taskId, TaskState.KILLED, ser.serialize(TaskKilled(killReason)))

case CausedBy(cDE: CommitDeniedException) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should have seen this when reviewing the original change, my bad; thanks for fixing this @squito !

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I shoudl have caught it too :) Kay mentioned OOM handling on the original pr, but we didn't think about interrupts.

@mridulm
Copy link
Contributor

mridulm commented Apr 6, 2018

Thanks for fixing this @squito, LGTM

@squito
Copy link
Contributor Author

squito commented Apr 6, 2018

Things I'm concerned about is that does there exists a situation like 'a task gets killed after it gets a FetchFailure, but re-run later and gets a FetchFailure too without TaskKilledException' (or this fix against speculative tasks only).

I don't think its worth trying to be fancy here -- in almost all situations, we don't care about the fetch failure handling when the task is killed. Even if this task is not speculative, it could be that its killed because another speculative task finished, and so this one gets aborted.

Suppose there is a real fetch failure, and you just happen to kill the task just after that. Since the task was killed, that means you don't really care about the input shuffle data at the moment in any case. You might run another job later on which tries to read the same shuffle data, and then it'll have to rediscover the missing shuffle data. But, thats about it. oh well.

@Ngone51
Copy link
Member

Ngone51 commented Apr 6, 2018

Yeah,agree.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

val mockBackend = mock[ExecutorBackend]
val mockUncaughtExceptionHandler = mock[UncaughtExceptionHandler]
var executor: Executor = null
var killingThread: Thread = null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not used? Or did you mean to join this thread or something like that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah good point -- I was originally thinking of that but I don't think that is needed. however I did get rid of the indefinite awaits.

@SparkQA
Copy link

SparkQA commented Apr 6, 2018

Test build #88991 has finished for PR 20987 at commit b387552.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor Author

squito commented Apr 8, 2018

I filed https://issues.apache.org/jira/browse/SPARK-23894 for the test failure -- appears to be a flaky test

@SparkQA
Copy link

SparkQA commented Apr 8, 2018

Test build #89019 has finished for PR 20987 at commit b1997a7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 8, 2018

Test build #89032 has finished for PR 20987 at commit 3860aed.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@vanzin
Copy link
Contributor

vanzin commented Apr 9, 2018

Merging to master / 2.3 / 2.2.

asfgit pushed a commit that referenced this pull request Apr 9, 2018
SPARK-19276 ensured that FetchFailures do not get swallowed by other
layers of exception handling, but it also meant that a killed task could
look like a fetch failure.  This is particularly a problem with
speculative execution, where we expect to kill tasks as they are reading
shuffle data.  The fix is to ensure that we always check for killed
tasks first.

Added a new unit test which fails before the fix, ran it 1k times to
check for flakiness.  Full suite of tests on jenkins.

Author: Imran Rashid <irashid@cloudera.com>

Closes #20987 from squito/SPARK-23816.

(cherry picked from commit 10f45bb)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
@asfgit asfgit closed this in 10f45bb Apr 9, 2018
asfgit pushed a commit that referenced this pull request Apr 9, 2018
SPARK-19276 ensured that FetchFailures do not get swallowed by other
layers of exception handling, but it also meant that a killed task could
look like a fetch failure.  This is particularly a problem with
speculative execution, where we expect to kill tasks as they are reading
shuffle data.  The fix is to ensure that we always check for killed
tasks first.

Added a new unit test which fails before the fix, ran it 1k times to
check for flakiness.  Full suite of tests on jenkins.

Author: Imran Rashid <irashid@cloudera.com>

Closes #20987 from squito/SPARK-23816.

(cherry picked from commit 10f45bb)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
csd-jenkins pushed a commit to alteryx/spark that referenced this pull request Apr 10, 2018
SPARK-19276 ensured that FetchFailures do not get swallowed by other
layers of exception handling, but it also meant that a killed task could
look like a fetch failure.  This is particularly a problem with
speculative execution, where we expect to kill tasks as they are reading
shuffle data.  The fix is to ensure that we always check for killed
tasks first.

Added a new unit test which fails before the fix, ran it 1k times to
check for flakiness.  Full suite of tests on jenkins.

Author: Imran Rashid <irashid@cloudera.com>

Closes apache#20987 from squito/SPARK-23816.

(cherry picked from commit 10f45bb)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
csd-jenkins pushed a commit to alteryx/spark that referenced this pull request May 1, 2018
* [SPARK-23816][CORE] Killed tasks should ignore FetchFailures.

SPARK-19276 ensured that FetchFailures do not get swallowed by other
layers of exception handling, but it also meant that a killed task could
look like a fetch failure.  This is particularly a problem with
speculative execution, where we expect to kill tasks as they are reading
shuffle data.  The fix is to ensure that we always check for killed
tasks first.

Added a new unit test which fails before the fix, ran it 1k times to
check for flakiness.  Full suite of tests on jenkins.

Author: Imran Rashid <irashid@cloudera.com>

Closes apache#20987 from squito/SPARK-23816.

(cherry picked from commit 10f45bb)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>

* [SPARK-24007][SQL] EqualNullSafe for FloatType and DoubleType might generate a wrong result by codegen.

`EqualNullSafe` for `FloatType` and `DoubleType` might generate a wrong result by codegen.

```scala
scala> val df = Seq((Some(-1.0d), None), (None, Some(-1.0d))).toDF()
df: org.apache.spark.sql.DataFrame = [_1: double, _2: double]

scala> df.show()
+----+----+
|  _1|  _2|
+----+----+
|-1.0|null|
|null|-1.0|
+----+----+

scala> df.filter("_1 <=> _2").show()
+----+----+
|  _1|  _2|
+----+----+
|-1.0|null|
|null|-1.0|
+----+----+
```

The result should be empty but the result remains two rows.

Added a test.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes apache#21094 from ueshin/issues/SPARK-24007/equalnullsafe.

(cherry picked from commit f09a9e9)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>

* [SPARK-23963][SQL] Properly handle large number of columns in query on text-based Hive table

## What changes were proposed in this pull request?

TableReader would get disproportionately slower as the number of columns in the query increased.

I fixed the way TableReader was looking up metadata for each column in the row. Previously, it had been looking up this data in linked lists, accessing each linked list by an index (column number). Now it looks up this data in arrays, where indexing by column number works better.

## How was this patch tested?

Manual testing
All sbt unit tests
python sql tests

Author: Bruce Robbins <bersprockets@gmail.com>

Closes apache#21043 from bersprockets/tabreadfix.

* [MINOR][DOCS] Fix comments of SQLExecution#withExecutionId

## What changes were proposed in this pull request?
Fix comment. Change `BroadcastHashJoin.broadcastFuture` to `BroadcastExchangeExec.relationFuture`: https://github.com/apache/spark/blob/d28d5732ae205771f1f443b15b10e64dcffb5ff0/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala#L66

## How was this patch tested?
N/A

Author: seancxmao <seancxmao@gmail.com>

Closes apache#21113 from seancxmao/SPARK-13136.

(cherry picked from commit c303b1b)
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
csd-jenkins pushed a commit to alteryx/spark that referenced this pull request May 15, 2018
* [SPARK-23816][CORE] Killed tasks should ignore FetchFailures.

SPARK-19276 ensured that FetchFailures do not get swallowed by other
layers of exception handling, but it also meant that a killed task could
look like a fetch failure.  This is particularly a problem with
speculative execution, where we expect to kill tasks as they are reading
shuffle data.  The fix is to ensure that we always check for killed
tasks first.

Added a new unit test which fails before the fix, ran it 1k times to
check for flakiness.  Full suite of tests on jenkins.

Author: Imran Rashid <irashid@cloudera.com>

Closes apache#20987 from squito/SPARK-23816.

(cherry picked from commit 10f45bb)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>

* [SPARK-24007][SQL] EqualNullSafe for FloatType and DoubleType might generate a wrong result by codegen.

`EqualNullSafe` for `FloatType` and `DoubleType` might generate a wrong result by codegen.

```scala
scala> val df = Seq((Some(-1.0d), None), (None, Some(-1.0d))).toDF()
df: org.apache.spark.sql.DataFrame = [_1: double, _2: double]

scala> df.show()
+----+----+
|  _1|  _2|
+----+----+
|-1.0|null|
|null|-1.0|
+----+----+

scala> df.filter("_1 <=> _2").show()
+----+----+
|  _1|  _2|
+----+----+
|-1.0|null|
|null|-1.0|
+----+----+
```

The result should be empty but the result remains two rows.

Added a test.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes apache#21094 from ueshin/issues/SPARK-24007/equalnullsafe.

(cherry picked from commit f09a9e9)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>

* [SPARK-23963][SQL] Properly handle large number of columns in query on text-based Hive table

## What changes were proposed in this pull request?

TableReader would get disproportionately slower as the number of columns in the query increased.

I fixed the way TableReader was looking up metadata for each column in the row. Previously, it had been looking up this data in linked lists, accessing each linked list by an index (column number). Now it looks up this data in arrays, where indexing by column number works better.

## How was this patch tested?

Manual testing
All sbt unit tests
python sql tests

Author: Bruce Robbins <bersprockets@gmail.com>

Closes apache#21043 from bersprockets/tabreadfix.

* [MINOR][DOCS] Fix comments of SQLExecution#withExecutionId

## What changes were proposed in this pull request?
Fix comment. Change `BroadcastHashJoin.broadcastFuture` to `BroadcastExchangeExec.relationFuture`: https://github.com/apache/spark/blob/d28d5732ae205771f1f443b15b10e64dcffb5ff0/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala#L66

## How was this patch tested?
N/A

Author: seancxmao <seancxmao@gmail.com>

Closes apache#21113 from seancxmao/SPARK-13136.

(cherry picked from commit c303b1b)
Signed-off-by: hyukjinkwon <gurwls223@apache.org>

* [SPARK-23941][MESOS] Mesos task failed on specific spark app name

## What changes were proposed in this pull request?
Shell escaped the name passed to spark-submit and change how conf attributes are shell escaped.

## How was this patch tested?
This test has been tested manually with Hive-on-spark with mesos or with the use case described in the issue with the sparkPi application with a custom name which contains illegal shell characters.

With this PR, hive-on-spark on mesos works like a charm with hive 3.0.0-SNAPSHOT.

I state that this contribution is my original work and that I license the work to the project under the project’s open source license

Author: Bounkong Khamphousone <bounkong.khamphousone@ebiznext.com>

Closes apache#21014 from tiboun/fix/SPARK-23941.

(cherry picked from commit 6782359)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>

* [SPARK-23433][CORE] Late zombie task completions update all tasksets

Fetch failure lead to multiple tasksets which are active for a given
stage.  While there is only one "active" version of the taskset, the
earlier attempts can still have running tasks, which can complete
successfully.  So a task completion needs to update every taskset
so that it knows the partition is completed.  That way the final active
taskset does not try to submit another task for the same partition,
and so that it knows when it is completed and when it should be
marked as a "zombie".

Added a regression test.

Author: Imran Rashid <irashid@cloudera.com>

Closes apache#21131 from squito/SPARK-23433.

(cherry picked from commit 94641fe)
Signed-off-by: Imran Rashid <irashid@cloudera.com>

* [SPARK-23489][SQL][TEST][BRANCH-2.2] HiveExternalCatalogVersionsSuite should verify the downloaded file

## What changes were proposed in this pull request?

This is a backport of apache#21210 because `branch-2.2` also faces the same failures.

Although [SPARK-22654](https://issues.apache.org/jira/browse/SPARK-22654) made `HiveExternalCatalogVersionsSuite` download from Apache mirrors three times, it has been flaky because it didn't verify the downloaded file. Some Apache mirrors terminate the downloading abnormally, the *corrupted* file shows the following errors.

```
gzip: stdin: not in gzip format
tar: Child returned status 1
tar: Error is not recoverable: exiting now
22:46:32.700 WARN org.apache.spark.sql.hive.HiveExternalCatalogVersionsSuite:

===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.hive.HiveExternalCatalogVersionsSuite, thread names: Keep-Alive-Timer =====

*** RUN ABORTED ***
  java.io.IOException: Cannot run program "./bin/spark-submit" (in directory "/tmp/test-spark/spark-2.2.0"): error=2, No such file or directory
```

This has been reported weirdly in two ways. For example, the above case is reported as Case 2 `no failures`.

- Case 1. [Test Result (1 failure / +1)](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.7/4389/)
- Case 2. [Test Result (no failures)](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.6/4811/)

This PR aims to make `HiveExternalCatalogVersionsSuite` more robust by verifying the downloaded `tgz` file by extracting and checking the existence of `bin/spark-submit`. If it turns out that the file is empty or corrupted, `HiveExternalCatalogVersionsSuite` will do retry logic like the download failure.

## How was this patch tested?

Pass the Jenkins.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes apache#21232 from dongjoon-hyun/SPARK-23489-2.

* [SPARK-23697][CORE] LegacyAccumulatorWrapper should define isZero correctly

## What changes were proposed in this pull request?

It's possible that Accumulators of Spark 1.x may no longer work with Spark 2.x. This is because `LegacyAccumulatorWrapper.isZero` may return wrong answer if `AccumulableParam` doesn't define equals/hashCode.

This PR fixes this by using reference equality check in `LegacyAccumulatorWrapper.isZero`.

## How was this patch tested?

a new test

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#21229 from cloud-fan/accumulator.

(cherry picked from commit 4d5de4d)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>

* [SPARK-21278][PYSPARK] Upgrade to Py4J 0.10.6

This PR aims to bump Py4J in order to fix the following float/double bug.
Py4J 0.10.5 fixes this (py4j/py4j#272) and the latest Py4J is 0.10.6.

**BEFORE**
```
>>> df = spark.range(1)
>>> df.select(df['id'] + 17.133574204226083).show()
+--------------------+
|(id + 17.1335742042)|
+--------------------+
|       17.1335742042|
+--------------------+
```

**AFTER**
```
>>> df = spark.range(1)
>>> df.select(df['id'] + 17.133574204226083).show()
+-------------------------+
|(id + 17.133574204226083)|
+-------------------------+
|       17.133574204226083|
+-------------------------+
```

Manual.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes apache#18546 from dongjoon-hyun/SPARK-21278.

(cherry picked from commit c8d0aba)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>

* [SPARK-16406][SQL] Improve performance of LogicalPlan.resolve

`LogicalPlan.resolve(...)` uses linear searches to find an attribute matching a name. This is fine in normal cases, but gets problematic when you try to resolve a large number of columns on a plan with a large number of attributes.

This PR adds an indexing structure to `resolve(...)` in order to find potential matches quicker. This PR improves the reference resolution time for the following code by 4x (11.8s -> 2.4s):

``` scala
val n = 4000
val values = (1 to n).map(_.toString).mkString(", ")
val columns = (1 to n).map("column" + _).mkString(", ")
val query =
  s"""
     |SELECT $columns
     |FROM VALUES ($values) T($columns)
     |WHERE 1=2 AND 1 IN ($columns)
     |GROUP BY $columns
     |ORDER BY $columns
     |""".stripMargin

spark.time(sql(query))
```

Existing tests.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes apache#14083 from hvanhovell/SPARK-16406.

* [PYSPARK] Update py4j to version 0.10.7.

(cherry picked from commit cc613b5)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
(cherry picked from commit 323dc3a)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>

* [SPARKR] Match pyspark features in SparkR communication protocol.

(cherry picked from commit 628c7b5)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
(cherry picked from commit 16cd9ac)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>

* Keep old-style messages for AnalysisException with ambiguous references
MatthewRBruce pushed a commit to Shopify/spark that referenced this pull request Jul 31, 2018
SPARK-19276 ensured that FetchFailures do not get swallowed by other
layers of exception handling, but it also meant that a killed task could
look like a fetch failure.  This is particularly a problem with
speculative execution, where we expect to kill tasks as they are reading
shuffle data.  The fix is to ensure that we always check for killed
tasks first.

Added a new unit test which fails before the fix, ran it 1k times to
check for flakiness.  Full suite of tests on jenkins.

Author: Imran Rashid <irashid@cloudera.com>

Closes apache#20987 from squito/SPARK-23816.

(cherry picked from commit 10f45bb)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
peter-toth pushed a commit to peter-toth/spark that referenced this pull request Oct 6, 2018
SPARK-19276 ensured that FetchFailures do not get swallowed by other
layers of exception handling, but it also meant that a killed task could
look like a fetch failure.  This is particularly a problem with
speculative execution, where we expect to kill tasks as they are reading
shuffle data.  The fix is to ensure that we always check for killed
tasks first.

Added a new unit test which fails before the fix, ran it 1k times to
check for flakiness.  Full suite of tests on jenkins.

Author: Imran Rashid <irashid@cloudera.com>

Closes apache#20987 from squito/SPARK-23816.

(cherry picked from commit 10f45bb)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>

Change-Id: I67f08fb451ec834673965026af22201e9c50456c
otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
SPARK-19276 ensured that FetchFailures do not get swallowed by other
layers of exception handling, but it also meant that a killed task could
look like a fetch failure.  This is particularly a problem with
speculative execution, where we expect to kill tasks as they are reading
shuffle data.  The fix is to ensure that we always check for killed
tasks first.

Added a new unit test which fails before the fix, ran it 1k times to
check for flakiness.  Full suite of tests on jenkins.

Author: Imran Rashid <irashid@cloudera.com>

Closes apache#20987 from squito/SPARK-23816.

(cherry picked from commit 10f45bb)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants