-
Notifications
You must be signed in to change notification settings - Fork 916
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
JDBC client should catch task failed exception instead of NPE in the incremental mode #4711
Conversation
warn(s"Ignore exception in terminal state with $statementId: $errMsg") | ||
val ke = KyuubiSQLException(errMsg) | ||
setOperationException(ke) | ||
throw ke |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
always re-throw exception makes sense to me, no sure if it will break some cases.
Codecov Report
@@ Coverage Diff @@
## master #4711 +/- ##
============================================
- Coverage 58.05% 57.98% -0.08%
Complexity 13 13
============================================
Files 580 580
Lines 32231 32232 +1
Branches 4302 4302
============================================
- Hits 18712 18690 -22
- Misses 11729 11744 +15
- Partials 1790 1798 +8
... and 12 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
Thanks, merged to master/1.7 |
… of NPE in the incremental mode ### _Why are the changes needed?_ Since the job was lazily submitted in the incremental mode, the engine should not catch the task failed exception even though the operation is in the terminal state. Before this PR: ``` 0: jdbc:hive2://0.0.0.0:10009/> set kyuubi.operation.incremental.collect=true; +---------------------------------------+--------+ | key | value | +---------------------------------------+--------+ | kyuubi.operation.incremental.collect | true | +---------------------------------------+--------+ 0: jdbc:hive2://0.0.0.0:10009/> SELECT raise_error('custom error message'); Error: (state=,code=0) 0: jdbc:hive2://0.0.0.0:10009/> ``` kyuubi server log ``` 2023-04-14 18:47:50.185 ERROR org.apache.kyuubi.server.KyuubiTBinaryFrontendService: Error fetching results: java.lang.NullPointerException: null at org.apache.kyuubi.server.BackendServiceMetric.$anonfun$fetchResults$1(BackendServiceMetric.scala:191) ~[classes/:?] at org.apache.kyuubi.metrics.MetricsSystem$.timerTracing(MetricsSystem.scala:111) ~[classes/:?] at org.apache.kyuubi.server.BackendServiceMetric.fetchResults(BackendServiceMetric.scala:187) ~[classes/:?] at org.apache.kyuubi.server.BackendServiceMetric.fetchResults$(BackendServiceMetric.scala:182) ~[classes/:?] at org.apache.kyuubi.server.KyuubiServer$$anon$1.fetchResults(KyuubiServer.scala:147) ~[classes/:?] at org.apache.kyuubi.service.TFrontendService.FetchResults(TFrontendService.scala:530) [classes/:?] ``` After this PR: ``` 0: jdbc:hive2://0.0.0.0:10009/> set kyuubi.operation.incremental.collect=true; +---------------------------------------+--------+ | key | value | +---------------------------------------+--------+ | kyuubi.operation.incremental.collect | true | +---------------------------------------+--------+ 0: jdbc:hive2://0.0.0.0:10009/> SELECT raise_error('custom error message'); Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 3) (0.0.0.0 executor driver): java.lang.RuntimeException: custom error message at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:136) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268) at org.apache.spark.rdd.RDD.collectPartition$1(RDD.scala:1036) at org.apache.spark.rdd.RDD.$anonfun$toLocalIterator$3(RDD.scala:1038) at org.apache.spark.rdd.RDD.$anonfun$toLocalIterator$3$adapted(RDD.scala:1038) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.kyuubi.operation.IterableFetchIterator.hasNext(FetchIterator.scala:97) at scala.collection.Iterator$SliceIterator.hasNext(Iterator.scala:268) at scala.collection.Iterator.toStream(Iterator.scala:1417) at scala.collection.Iterator.toStream$(Iterator.scala:1416) at scala.collection.AbstractIterator.toStream(Iterator.scala:1431) at scala.collection.TraversableOnce.toSeq(TraversableOnce.scala:354) at scala.collection.TraversableOnce.toSeq$(TraversableOnce.scala:354) at scala.collection.AbstractIterator.toSeq(Iterator.scala:1431) at org.apache.kyuubi.engine.spark.operation.SparkOperation.$anonfun$getNextRowSet$1(SparkOperation.scala:265) at org.apache.kyuubi.engine.spark.operation.SparkOperation.$anonfun$withLocalProperties$1(SparkOperation.scala:155) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169) at org.apache.kyuubi.engine.spark.operation.SparkOperation.withLocalProperties(SparkOperation.scala:139) at org.apache.kyuubi.engine.spark.operation.SparkOperation.getNextRowSet(SparkOperation.scala:243) at org.apache.kyuubi.operation.OperationManager.getOperationNextRowSet(OperationManager.scala:141) at org.apache.kyuubi.session.AbstractSession.fetchResults(AbstractSession.scala:240) at org.apache.kyuubi.service.AbstractBackendService.fetchResults(AbstractBackendService.scala:214) at org.apache.kyuubi.service.TFrontendService.FetchResults(TFrontendService.scala:530) at org.apache.hive.service.rpc.thrift.TCLIService$Processor$FetchResults.getResult(TCLIService.java:1837) at org.apache.hive.service.rpc.thrift.TCLIService$Processor$FetchResults.getResult(TCLIService.java:1822) at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39) at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39) at org.apache.kyuubi.service.authentication.TSetIpAddressProcessor.process(TSetIpAddressProcessor.scala:36) at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: custom error message at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:136) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) ... 3 more (state=,code=0) 0: jdbc:hive2://0.0.0.0:10009/> ``` ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request Closes #4711 from cfmcgrady/incremental-show-error-msg. Closes #4711 66bb527 [Fu Chen] JDBC client should catch task failed exception in the incremental mode Authored-by: Fu Chen <cfmcgrady@gmail.com> Signed-off-by: Cheng Pan <chengpan@apache.org> (cherry picked from commit db46b5b) Signed-off-by: Cheng Pan <chengpan@apache.org>
Why are the changes needed?
Since the job was lazily submitted in the incremental mode, the engine should not catch the task failed exception even though the operation is in the terminal state.
Before this PR:
kyuubi server log
After this PR:
How was this patch tested?
Add some test cases that check the changes thoroughly including negative and positive cases if possible
Add screenshots for manual tests if appropriate
Run test locally before make a pull request