Skip to content

[SPARK-3809][SQL] Fixes test suites in hive-thriftserver #2675

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.scalatest.{BeforeAndAfterAll, FunSuite}

import org.apache.spark.Logging
import org.apache.spark.{SparkException, Logging}
import org.apache.spark.sql.catalyst.util.getTempFilePath

class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
Expand Down Expand Up @@ -62,8 +62,11 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {

def captureOutput(source: String)(line: String) {
buffer += s"$source> $line"
// If we haven't found all expected answers...
if (next.get() < expectedAnswers.size) {
// If another expected answer is found...
if (line.startsWith(expectedAnswers(next.get()))) {
// If all expected answers have been found...
if (next.incrementAndGet() == expectedAnswers.size) {
foundAllExpectedAnswers.trySuccess(())
}
Expand All @@ -77,7 +80,8 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {

Future {
val exitValue = process.exitValue()
logInfo(s"Spark SQL CLI process exit value: $exitValue")
foundAllExpectedAnswers.tryFailure(
new SparkException(s"Spark SQL CLI process exit value: $exitValue"))
}

try {
Expand All @@ -98,6 +102,7 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
|End CliSuite failure output
|===========================
""".stripMargin, cause)
throw cause
} finally {
warehousePath.delete()
metastorePath.delete()
Expand All @@ -109,7 +114,7 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
val dataFilePath =
Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")

runCliWithin(1.minute)(
runCliWithin(3.minute)(
"CREATE TABLE hive_test(key INT, val STRING);"
-> "OK",
"SHOW TABLES;"
Expand All @@ -120,7 +125,7 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
-> "Time taken: ",
"SELECT COUNT(*) FROM hive_test;"
-> "5",
"DROP TABLE hive_test"
"DROP TABLE hive_test;"
-> "Time taken: "
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@

package org.apache.spark.sql.hive.thriftserver

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future, Promise}
import scala.sys.process.{Process, ProcessLogger}

import java.io.File
import java.net.ServerSocket
import java.sql.{DriverManager, Statement}
import java.util.concurrent.TimeoutException

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.concurrent.{Await, Promise}
import scala.sys.process.{Process, ProcessLogger}
import scala.util.Try

import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hive.jdbc.HiveDriver
import org.scalatest.FunSuite
Expand All @@ -41,25 +41,25 @@ import org.apache.spark.sql.catalyst.util.getTempFilePath
class HiveThriftServer2Suite extends FunSuite with Logging {
Class.forName(classOf[HiveDriver].getCanonicalName)

private val listeningHost = "localhost"
private val listeningPort = {
// Let the system to choose a random available port to avoid collision with other parallel
// builds.
val socket = new ServerSocket(0)
val port = socket.getLocalPort
socket.close()
port
}

private val warehousePath = getTempFilePath("warehouse")
private val metastorePath = getTempFilePath("metastore")
private val metastoreJdbcUri = s"jdbc:derby:;databaseName=$metastorePath;create=true"

def startThriftServerWithin(timeout: FiniteDuration = 30.seconds)(f: Statement => Unit) {
val serverScript = "../../sbin/start-thriftserver.sh".split("/").mkString(File.separator)
def startThriftServerWithin(timeout: FiniteDuration = 1.minute)(f: Statement => Unit) {
val startScript = "../../sbin/start-thriftserver.sh".split("/").mkString(File.separator)
val stopScript = "../../sbin/stop-thriftserver.sh".split("/").mkString(File.separator)

val warehousePath = getTempFilePath("warehouse")
val metastorePath = getTempFilePath("metastore")
val metastoreJdbcUri = s"jdbc:derby:;databaseName=$metastorePath;create=true"
val listeningHost = "localhost"
val listeningPort = {
// Let the system to choose a random available port to avoid collision with other parallel
// builds.
val socket = new ServerSocket(0)
val port = socket.getLocalPort
socket.close()
port
}

val command =
s"""$serverScript
s"""$startScript
| --master local
| --hiveconf hive.root.logger=INFO,console
| --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$metastoreJdbcUri
Expand All @@ -68,29 +68,40 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
| --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_PORT}=$listeningPort
""".stripMargin.split("\\s+").toSeq

val serverStarted = Promise[Unit]()
val serverRunning = Promise[Unit]()
val buffer = new ArrayBuffer[String]()
val LOGGING_MARK =
s"starting ${HiveThriftServer2.getClass.getCanonicalName.stripSuffix("$")}, logging to "
var logTailingProcess: Process = null
var logFilePath: String = null

def captureOutput(source: String)(line: String) {
buffer += s"$source> $line"
def captureLogOutput(line: String): Unit = {
buffer += line
if (line.contains("ThriftBinaryCLIService listening on")) {
serverStarted.success(())
serverRunning.success(())
}
}

val process = Process(command).run(
ProcessLogger(captureOutput("stdout"), captureOutput("stderr")))

Future {
val exitValue = process.exitValue()
logInfo(s"Spark SQL Thrift server process exit value: $exitValue")
def captureThriftServerOutput(source: String)(line: String): Unit = {
if (line.startsWith(LOGGING_MARK)) {
logFilePath = line.drop(LOGGING_MARK.length).trim
// Ensure that the log file is created so that the `tail' command won't fail
Try(new File(logFilePath).createNewFile())
logTailingProcess = Process(s"/usr/bin/env tail -f $logFilePath")
.run(ProcessLogger(captureLogOutput, _ => ()))
}
}

// Resets SPARK_TESTING to avoid loading Log4J configurations in testing class paths
Process(command, None, "SPARK_TESTING" -> "0").run(ProcessLogger(
captureThriftServerOutput("stdout"),
captureThriftServerOutput("stderr")))

val jdbcUri = s"jdbc:hive2://$listeningHost:$listeningPort/"
val user = System.getProperty("user.name")

try {
Await.result(serverStarted.future, timeout)
Await.result(serverRunning.future, timeout)

val connection = DriverManager.getConnection(jdbcUri, user, "")
val statement = connection.createStatement()
Expand Down Expand Up @@ -122,10 +133,15 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
|End HiveThriftServer2Suite failure output
|=========================================
""".stripMargin, cause)
throw cause
} finally {
warehousePath.delete()
metastorePath.delete()
process.destroy()
Process(stopScript).run().exitValue()
// The `spark-daemon.sh' script uses kill, which is not synchronous, have to wait for a while.
Thread.sleep(3.seconds.toMillis)
Option(logTailingProcess).map(_.destroy())
Option(logFilePath).map(new File(_).delete())
}
}

Expand Down