Skip to content

[SPARK-30345][SQL] Fix intermittent test failure (ConnectException) on ThriftServerQueryTestSuite/ThriftServerWithSparkContextSuite #27001

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from

Conversation

HeartSaVioR
Copy link
Contributor

What changes were proposed in this pull request?

This patch fixes the intermittent test failure on ThriftServerQueryTestSuite/ThriftServerWithSparkContextSuite, getting ConnectException when querying to thrift server.
(https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/115646/testReport/)

The relevant unit test log messages are following:

19/12/23 13:33:01.875 pool-1-thread-1 INFO AbstractService: Service:ThriftBinaryCLIService is started.
19/12/23 13:33:01.875 pool-1-thread-1 INFO AbstractService: Service:HiveServer2 is started.
...
19/12/23 13:33:01.888 pool-1-thread-1 INFO ThriftServerWithSparkContextSuite: HiveThriftServer2 started successfully
...
19/12/23 13:33:01.909 pool-1-thread-1-ScalaTest-running-ThriftServerWithSparkContextSuite INFO ThriftServerWithSparkContextSuite:

===== TEST OUTPUT FOR o.a.s.sql.hive.thriftserver.ThriftServerWithSparkContextSuite: 'SPARK-29911: Uncache cached tables when session closed' =====

...
19/12/23 13:33:02.017 pool-1-thread-1-ScalaTest-running-ThriftServerWithSparkContextSuite INFO Utils: Supplied authorities: localhost:15441
19/12/23 13:33:02.018 pool-1-thread-1-ScalaTest-running-ThriftServerWithSparkContextSuite INFO Utils: Resolved authority: localhost:15441
19/12/23 13:33:02.078 HiveServer2-Background-Pool: Thread-213 INFO BaseSessionStateBuilder$$anon$2: Optimization rule 'org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation' is excluded from the optimizer.
19/12/23 13:33:02.078 HiveServer2-Background-Pool: Thread-213 INFO BaseSessionStateBuilder$$anon$2: Optimization rule 'org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation' is excluded from the optimizer.
19/12/23 13:33:02.121 pool-1-thread-1-ScalaTest-running-ThriftServerWithSparkContextSuite WARN HiveConnection: Failed to connect to localhost:15441
19/12/23 13:33:02.124 pool-1-thread-1-ScalaTest-running-ThriftServerWithSparkContextSuite INFO ThriftServerWithSparkContextSuite:

===== FINISHED o.a.s.sql.hive.thriftserver.ThriftServerWithSparkContextSuite: 'SPARK-29911: Uncache cached tables when session closed' =====

19/12/23 13:33:02.143 Thread-35 INFO ThriftCLIService: Starting ThriftBinaryCLIService on port 15441 with 5...500 worker threads
19/12/23 13:33:02.327 pool-1-thread-1 INFO HiveServer2: Shutting down HiveServer2
19/12/23 13:33:02.328 pool-1-thread-1 INFO ThriftCLIService: Thrift server has stopped

(Here the error is logged as WARN HiveConnection: Failed to connect to localhost:15441 - the actual stack trace can be seen on Jenkins test summary.)

The reason of test failure: Thrift(Binary|Http)CLIService prepare and launch the service asynchronously (in new thread), which suites are not waiting for completion and just start running tests, ends up with race condition.

That can be easily reproduced, via adding artificial sleep in ThriftBinaryCLIService.run() here:

(Note that sleep should be added before initializing server socket. E.g. Line 57)

This patch changes the test initialization logic to try executing simple query to wait until the service is available. The patch also refactors the code to apply the change both ThriftServerQueryTestSuite and ThriftServerWithSparkContextSuite easily.

Why are the changes needed?

This patch fixes the intermittent failure observed here:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/115646/testReport/

Does this PR introduce any user-facing change?

No

How was this patch tested?

Artificially made the test fail consistently (by the approach described above), and confirmed the patch fixed the test.

…n ThriftServerQueryTestSuite/ThriftServerWithSparkContextSuite
@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Dec 24, 2019

Here's the diff file to reproduce the issue:
SPARK-30345-reproducerThriftBinaryCLIService.diff.zip

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Dec 24, 2019

This patch also resolve SPARK-28883 which error message is reported to ConnectException, but given there might be some failures not relevant to this, I'd defer to reviewers judging whether this patch also resolves SPARK-28883 entirely or partially.

@SparkQA
Copy link

SparkQA commented Dec 24, 2019

Test build #115730 has finished for PR 27001 at commit 8309fb6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait SharedThriftServer extends QueryTest with SharedSparkSession
  • class ThriftServerQueryTestSuite extends SQLQueryTestSuite with SharedThriftServer
  • class ThriftServerWithSparkContextSuite extends SharedThriftServer

@HeartSaVioR
Copy link
Contributor Author

ThriftServerQueryTestSuite has been touched from various people, but let me first cc.ing some committers.
cc. @wangyum @cloud-fan @maropu @HyukjinKwon

sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, port.toString)
hiveServer2 = HiveThriftServer2.startWithContext(sqlContext)

eventually(timeout(30.seconds), interval(1.seconds)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you leave some comments here?

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.test.SharedSparkSession

trait SharedThriftServer extends QueryTest with SharedSparkSession {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just moving QueryTest into each test suite since they are not dependent between each other?

@maropu
Copy link
Member

maropu commented Dec 26, 2019

The change looks reasonable to me.

@HeartSaVioR
Copy link
Contributor Author

Thanks for the reviewing, I've addressed review comments. Please take a look again. Thanks in advance!

@SparkQA
Copy link

SparkQA commented Dec 26, 2019

Test build #115810 has finished for PR 27001 at commit 160ca1a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait SharedThriftServer extends SharedSparkSession

@cloud-fan
Copy link
Contributor

The fix itself LGTM, but do you know why Thrift(Binary|Http)CLIService prepare and launch the service asynchronously? Seems end-users can also hit this problem if they run queries right after the server is started.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Dec 27, 2019

The fix itself LGTM, but do you know why Thrift(Binary|Http)CLIService prepare and launch the service asynchronously? Seems end-users can also hit this problem if they run queries right after the server is started.

I don't know about history, but if I get the commit history correctly, codebase of sql/hive-thiftserver just came from Hive, and we just fixed minors. So that's basically inherited from Hive.

Here we're embedding Hive server in test suites hence get a chance to encounter race condition, but most likely it will run as standalone app which end users would just think server is not ready and rerun the query when query fails for such reason. (And the chance is rare, as it's even hard to reproduce without adding artificial sleep. It might be under 1 second.)

Btw, there's another test suites which runs Hive server as external standalone app - it shouldn't encounter this issue but it would show longer startup and shutdown.

abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAll with Logging {
def mode: ServerMode.Value
private val CLASS_NAME = HiveThriftServer2.getClass.getCanonicalName.stripSuffix("$")
private val LOG_FILE_MARK = s"starting $CLASS_NAME, logging to "
protected val startScript = "../../sbin/start-thriftserver.sh".split("/").mkString(File.separator)
protected val stopScript = "../../sbin/stop-thriftserver.sh".split("/").mkString(File.separator)
private var listeningPort: Int = _
protected def serverPort: Int = listeningPort
protected val hiveConfList = "a=avalue;b=bvalue"
protected val hiveVarList = "c=cvalue;d=dvalue"
protected def user = System.getProperty("user.name")
protected var warehousePath: File = _
protected var metastorePath: File = _
protected def metastoreJdbcUri = s"jdbc:derby:;databaseName=$metastorePath;create=true"
private val pidDir: File = Utils.createTempDir(namePrefix = "thriftserver-pid")
protected var logPath: File = _
protected var operationLogPath: File = _
protected var lScratchDir: File = _
private var logTailingProcess: Process = _
private var diagnosisBuffer: ArrayBuffer[String] = ArrayBuffer.empty[String]
protected def extraConf: Seq[String] = Nil
protected def serverStartCommand(port: Int) = {
val portConf = if (mode == ServerMode.binary) {
ConfVars.HIVE_SERVER2_THRIFT_PORT
} else {
ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT
}
val driverClassPath = {
// Writes a temporary log4j.properties and prepend it to driver classpath, so that it
// overrides all other potential log4j configurations contained in other dependency jar files.
val tempLog4jConf = Utils.createTempDir().getCanonicalPath
Files.write(
"""log4j.rootCategory=DEBUG, console
|log4j.appender.console=org.apache.log4j.ConsoleAppender
|log4j.appender.console.target=System.err
|log4j.appender.console.layout=org.apache.log4j.PatternLayout
|log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
""".stripMargin,
new File(s"$tempLog4jConf/log4j.properties"),
StandardCharsets.UTF_8)
tempLog4jConf
}
s"""$startScript
| --master local
| --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$metastoreJdbcUri
| --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath
| --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=localhost
| --hiveconf ${ConfVars.HIVE_SERVER2_TRANSPORT_MODE}=$mode
| --hiveconf ${ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION}=$operationLogPath
| --hiveconf ${ConfVars.LOCALSCRATCHDIR}=$lScratchDir
| --hiveconf $portConf=$port
| --driver-class-path $driverClassPath
| --driver-java-options -Dlog4j.debug
| --conf spark.ui.enabled=false
| ${extraConf.mkString("\n")}
""".stripMargin.split("\\s+").toSeq
}
/**
* String to scan for when looking for the thrift binary endpoint running.
* This can change across Hive versions.
*/
val THRIFT_BINARY_SERVICE_LIVE = "Starting ThriftBinaryCLIService on port"
/**
* String to scan for when looking for the thrift HTTP endpoint running.
* This can change across Hive versions.
*/
val THRIFT_HTTP_SERVICE_LIVE = "Started ThriftHttpCLIService in http"
val SERVER_STARTUP_TIMEOUT = 3.minutes
private def startThriftServer(port: Int, attempt: Int) = {
warehousePath = Utils.createTempDir()
warehousePath.delete()
metastorePath = Utils.createTempDir()
metastorePath.delete()
operationLogPath = Utils.createTempDir()
operationLogPath.delete()
lScratchDir = Utils.createTempDir()
lScratchDir.delete()
logPath = null
logTailingProcess = null
val command = serverStartCommand(port)
diagnosisBuffer ++=
s"""
|### Attempt $attempt ###
|HiveThriftServer2 command line: $command
|Listening port: $port
|System user: $user
""".stripMargin.split("\n")
logInfo(s"Trying to start HiveThriftServer2: port=$port, mode=$mode, attempt=$attempt")
logPath = {
val lines = Utils.executeAndGetOutput(
command = command,
extraEnvironment = Map(
// Disables SPARK_TESTING to exclude log4j.properties in test directories.
"SPARK_TESTING" -> "0",
// But set SPARK_SQL_TESTING to make spark-class happy.
"SPARK_SQL_TESTING" -> "1",
// Points SPARK_PID_DIR to SPARK_HOME, otherwise only 1 Thrift server instance can be
// started at a time, which is not Jenkins friendly.
"SPARK_PID_DIR" -> pidDir.getCanonicalPath),
redirectStderr = true)
logInfo(s"COMMAND: $command")
logInfo(s"OUTPUT: $lines")
lines.split("\n").collectFirst {
case line if line.contains(LOG_FILE_MARK) => new File(line.drop(LOG_FILE_MARK.length))
}.getOrElse {
throw new RuntimeException("Failed to find HiveThriftServer2 log file.")
}
}
val serverStarted = Promise[Unit]()
// Ensures that the following "tail" command won't fail.
logPath.createNewFile()
val successLines = Seq(THRIFT_BINARY_SERVICE_LIVE, THRIFT_HTTP_SERVICE_LIVE)
logTailingProcess = {
val command = s"/usr/bin/env tail -n +0 -f ${logPath.getCanonicalPath}".split(" ")
// Using "-n +0" to make sure all lines in the log file are checked.
val builder = new ProcessBuilder(command: _*)
val captureOutput = (line: String) => diagnosisBuffer.synchronized {
diagnosisBuffer += line
successLines.foreach { r =>
if (line.contains(r)) {
serverStarted.trySuccess(())
}
}
}
val process = builder.start()
new ProcessOutputCapturer(process.getInputStream, captureOutput).start()
new ProcessOutputCapturer(process.getErrorStream, captureOutput).start()
process
}
ThreadUtils.awaitResult(serverStarted.future, SERVER_STARTUP_TIMEOUT)
}
private def stopThriftServer(): Unit = {
// The `spark-daemon.sh' script uses kill, which is not synchronous, have to wait for a while.
Utils.executeAndGetOutput(
command = Seq(stopScript),
extraEnvironment = Map("SPARK_PID_DIR" -> pidDir.getCanonicalPath))
Thread.sleep(3.seconds.toMillis)
warehousePath.delete()
warehousePath = null
metastorePath.delete()
metastorePath = null
operationLogPath.delete()
operationLogPath = null
lScratchDir.delete()
lScratchDir = null
Option(logPath).foreach(_.delete())
logPath = null
Option(logTailingProcess).foreach(_.destroy())
logTailingProcess = null
}
private def dumpLogs(): Unit = {
logError(
s"""
|=====================================
|HiveThriftServer2Suite failure output
|=====================================
|${diagnosisBuffer.mkString("\n")}
|=========================================
|End HiveThriftServer2Suite failure output
|=========================================
""".stripMargin)
}
override protected def beforeAll(): Unit = {
super.beforeAll()
// Chooses a random port between 10000 and 19999
listeningPort = 10000 + Random.nextInt(10000)
diagnosisBuffer.clear()
// Retries up to 3 times with different port numbers if the server fails to start
(1 to 3).foldLeft(Try(startThriftServer(listeningPort, 0))) { case (started, attempt) =>
started.orElse {
listeningPort += 1
stopThriftServer()
Try(startThriftServer(listeningPort, attempt))
}
}.recover {
case cause: Throwable =>
dumpLogs()
throw cause
}.get
logInfo(s"HiveThriftServer2 started successfully")
}
override protected def afterAll(): Unit = {
try {
stopThriftServer()
logInfo("HiveThriftServer2 stopped")
} finally {
super.afterAll()
}
}
}

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 7adf886 Dec 27, 2019
@HeartSaVioR
Copy link
Contributor Author

Thanks all for reviewing and merging!

@HeartSaVioR HeartSaVioR deleted the SPARK-30345 branch December 27, 2019 07:39
@juliuszsompolski
Copy link
Contributor

Thank you very much @HeartSaVioR for this trait cleanup! I missed this PR over Christmas and just now saw the new code. This trait that starts the Thriftserver with existing SparkContext instead of as a separate process can be very useful to increase thriftserver test coverage in other places - e.g. add tests validating the state of the thriftserver listener, session manager, query cancellation etc... - things to assert which the test needs to touch thriftserver internals to validate.

fqaiser94 pushed a commit to fqaiser94/spark that referenced this pull request Mar 30, 2020
…n ThriftServerQueryTestSuite/ThriftServerWithSparkContextSuite

### What changes were proposed in this pull request?

This patch fixes the intermittent test failure on ThriftServerQueryTestSuite/ThriftServerWithSparkContextSuite, getting ConnectException when querying to thrift server.
(https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/115646/testReport/)

The relevant unit test log messages are following:

```
19/12/23 13:33:01.875 pool-1-thread-1 INFO AbstractService: Service:ThriftBinaryCLIService is started.
19/12/23 13:33:01.875 pool-1-thread-1 INFO AbstractService: Service:HiveServer2 is started.
...
19/12/23 13:33:01.888 pool-1-thread-1 INFO ThriftServerWithSparkContextSuite: HiveThriftServer2 started successfully
...
19/12/23 13:33:01.909 pool-1-thread-1-ScalaTest-running-ThriftServerWithSparkContextSuite INFO ThriftServerWithSparkContextSuite:

===== TEST OUTPUT FOR o.a.s.sql.hive.thriftserver.ThriftServerWithSparkContextSuite: 'SPARK-29911: Uncache cached tables when session closed' =====

...
19/12/23 13:33:02.017 pool-1-thread-1-ScalaTest-running-ThriftServerWithSparkContextSuite INFO Utils: Supplied authorities: localhost:15441
19/12/23 13:33:02.018 pool-1-thread-1-ScalaTest-running-ThriftServerWithSparkContextSuite INFO Utils: Resolved authority: localhost:15441
19/12/23 13:33:02.078 HiveServer2-Background-Pool: Thread-213 INFO BaseSessionStateBuilder$$anon$2: Optimization rule 'org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation' is excluded from the optimizer.
19/12/23 13:33:02.078 HiveServer2-Background-Pool: Thread-213 INFO BaseSessionStateBuilder$$anon$2: Optimization rule 'org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation' is excluded from the optimizer.
19/12/23 13:33:02.121 pool-1-thread-1-ScalaTest-running-ThriftServerWithSparkContextSuite WARN HiveConnection: Failed to connect to localhost:15441
19/12/23 13:33:02.124 pool-1-thread-1-ScalaTest-running-ThriftServerWithSparkContextSuite INFO ThriftServerWithSparkContextSuite:

===== FINISHED o.a.s.sql.hive.thriftserver.ThriftServerWithSparkContextSuite: 'SPARK-29911: Uncache cached tables when session closed' =====

19/12/23 13:33:02.143 Thread-35 INFO ThriftCLIService: Starting ThriftBinaryCLIService on port 15441 with 5...500 worker threads
19/12/23 13:33:02.327 pool-1-thread-1 INFO HiveServer2: Shutting down HiveServer2
19/12/23 13:33:02.328 pool-1-thread-1 INFO ThriftCLIService: Thrift server has stopped
```
(Here the error is logged as `WARN HiveConnection: Failed to connect to localhost:15441` - the actual stack trace can be seen on Jenkins test summary.)

The reason of test failure: Thrift(Binary|Http)CLIService prepare and launch the service asynchronously (in new thread), which suites are not waiting for completion and just start running tests, ends up with race condition.

That can be easily reproduced, via adding artificial sleep in `ThriftBinaryCLIService.run()` here:
https://github.com/apache/spark/blob/ba3f6330dd2b6054988f1f6f0ffe014fc4969088/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java#L49

(Note that `sleep` should be added before initializing server socket. E.g. Line 57)

This patch changes the test initialization logic to try executing simple query to wait until the service is available. The patch also refactors the code to apply the change both ThriftServerQueryTestSuite and ThriftServerWithSparkContextSuite easily.

### Why are the changes needed?

This patch fixes the intermittent failure observed here:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/115646/testReport/

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Artificially made the test fail consistently (by the approach described above), and confirmed the patch fixed the test.

Closes apache#27001 from HeartSaVioR/SPARK-30345.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants