-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-6964][SQL] Support Cancellation in the Thrift Server #6207
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
819ae03
184ec35
04142c3
3d8ebf8
341885b
eb3e385
380480f
7bfa2a7
687c113
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,11 +19,13 @@ package org.apache.spark.sql.hive.thriftserver | |
|
||
import java.io.File | ||
import java.net.URL | ||
import java.sql.{Date, DriverManager, Statement} | ||
import java.nio.charset.StandardCharsets | ||
import java.sql.{Date, DriverManager, SQLException, Statement} | ||
|
||
import scala.collection.mutable.ArrayBuffer | ||
import scala.concurrent.duration._ | ||
import scala.concurrent.{Await, Promise} | ||
import scala.concurrent.{Await, Promise, future} | ||
import scala.concurrent.ExecutionContext.Implicits.global | ||
import scala.sys.process.{Process, ProcessLogger} | ||
import scala.util.{Random, Try} | ||
|
||
|
@@ -338,6 +340,42 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { | |
} | ||
) | ||
} | ||
|
||
test("test jdbc cancel") { | ||
withJdbcStatement { statement => | ||
val queries = Seq( | ||
"DROP TABLE IF EXISTS test_map", | ||
"CREATE TABLE test_map(key INT, value STRING)", | ||
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map") | ||
|
||
queries.foreach(statement.execute) | ||
|
||
val largeJoin = "SELECT COUNT(*) FROM test_map " + | ||
List.fill(10)("join test_map").mkString(" ") | ||
val f = future { Thread.sleep(100); statement.cancel(); } | ||
val e = intercept[SQLException] { | ||
statement.executeQuery(largeJoin) | ||
} | ||
assert(e.getMessage contains "cancelled") | ||
Await.result(f, 3.minute) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why three minutes? This seems arbitrary. |
||
|
||
// cancel is a noop | ||
statement.executeQuery("SET spark.sql.hive.thriftServer.async=false") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Async should probably be re-enabled after this test, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree, if the settings persist beyond the closing of the sql statement. |
||
val sf = future { Thread.sleep(100); statement.cancel(); } | ||
val smallJoin = "SELECT COUNT(*) FROM test_map " + | ||
List.fill(4)("join test_map").mkString(" ") | ||
val rs1 = statement.executeQuery(smallJoin) | ||
Await.result(sf, 3.minute) | ||
rs1.next() | ||
assert(rs1.getInt(1) === math.pow(5, 5)) | ||
rs1.close() | ||
|
||
val rs2 = statement.executeQuery("SELECT COUNT(*) FROM test_map") | ||
rs2.next() | ||
assert(rs2.getInt(1) === 5) | ||
rs2.close() | ||
} | ||
} | ||
} | ||
|
||
class HiveThriftHttpServerSuite extends HiveThriftJdbcTest { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed off-line we should look at doing this without a sleep, but I don't think that needs to block merging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HiveStatement does not support setQueryTimeout.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this is the cause of timeouts / flakiness in this suite: on a slow machine, it's possible that the timeout might expire before the query begins executing, which would cause us to hang indefinitely while waiting for the query to complete.
I'm fixing this in my own PR, but just wanted to flag this here as being a problem so that we don't do this again in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, another problem: this probably shouldn't be using the global execution context, since its thread pool could be tied up by other uses and prevent this future from being scheduled in a timely manner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, +1 for a more reliable way to trigger cancel after submitting the long execution.