Skip to content

[SPARK-14994][SQL] Remove execution hive from HiveSessionState #12770

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 14 commits into from
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ trait CatalystConf {

def runSQLonFile: Boolean

def warehousePath: String

/**
* Returns the [[Resolver]] for the current configuration, which can be used to determine if two
* identifiers are equal.
Expand All @@ -52,5 +54,6 @@ case class SimpleCatalystConf(
optimizerMaxIterations: Int = 100,
optimizerInSetConversionThreshold: Int = 10,
maxCaseBranchesForCodegen: Int = 20,
runSQLonFile: Boolean = true)
runSQLonFile: Boolean = true,
warehousePath: String = "/user/hive/warehouse")
extends CatalystConf
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class SessionCatalog(
}

def getDefaultDBPath(db: String): String = {
System.getProperty("java.io.tmpdir") + File.separator + db + ".db"
new Path(new Path(conf.warehousePath), db + ".db").toString
}

// ----------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ object SQLConf {

}

val WAREHOUSE_PATH = SQLConfigBuilder("spark.sql.warehouse.dir")
.doc("The default location for managed databases and tables.")
.stringConf
.createWithDefault("${system:user.dir}/spark-warehouse")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin I think default for this should be something else. This will fail things like https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/58168/


val OPTIMIZER_MAX_ITERATIONS = SQLConfigBuilder("spark.sql.optimizer.maxIterations")
.internal()
.doc("The max number of iterations the optimizer and analyzer runs.")
Expand Down Expand Up @@ -645,6 +650,10 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def variableSubstituteDepth: Int = getConf(VARIABLE_SUBSTITUTE_DEPTH)

def warehousePath: String = {
getConf(WAREHOUSE_PATH).replace("${system:user.dir}", System.getProperty("user.dir"))
}

override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)

override def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.spark.sql.internal

import java.io.File
import java.util.Properties

import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.sql._
Expand Down Expand Up @@ -65,9 +67,6 @@ private[sql] class SessionState(sparkSession: SparkSession) {
hadoopConf
}

// Automatically extract `spark.sql.*` entries and put it in our SQLConf
setConf(SQLContext.getSQLProperties(sparkSession.sparkContext.getConf))

lazy val experimentalMethods = new ExperimentalMethods

/**
Expand Down Expand Up @@ -150,6 +149,12 @@ private[sql] class SessionState(sparkSession: SparkSession) {
new ContinuousQueryManager(sparkSession)
}

private val jarClassLoader: NonClosableMutableURLClassLoader =
sparkSession.sharedState.jarClassLoader

// Automatically extract `spark.sql.*` entries and put it in our SQLConf
// We need to call it after all of vals have been initialized.
setConf(SQLContext.getSQLProperties(sparkSession.sparkContext.getConf))

// ------------------------------------------------------
// Helper methods, partially leftover from pre-2.0 days
Expand Down Expand Up @@ -180,6 +185,17 @@ private[sql] class SessionState(sparkSession: SparkSession) {

def addJar(path: String): Unit = {
sparkSession.sparkContext.addJar(path)

val uri = new Path(path).toUri
val jarURL = if (uri.getScheme == null) {
// `path` is a local file path without a URL scheme
new File(path).toURI.toURL
} else {
// `path` is a URL with a scheme
uri.toURL
}
jarClassLoader.addURL(jarURL)
Thread.currentThread().setContextClassLoader(jarClassLoader)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, InMemoryCatalog}
import org.apache.spark.sql.execution.CacheManager
import org.apache.spark.sql.execution.ui.SQLListener
import org.apache.spark.util.MutableURLClassLoader


/**
Expand All @@ -44,4 +45,21 @@ private[sql] class SharedState(val sparkContext: SparkContext) {
*/
lazy val externalCatalog: ExternalCatalog = new InMemoryCatalog

/**
* A classloader used to load all user-added jar.
*/
val jarClassLoader = new NonClosableMutableURLClassLoader(
org.apache.spark.util.Utils.getContextOrSparkClassLoader)

}


/**
* URL class loader that exposes the `addURL` and `getURLs` methods in URLClassLoader.
* This class loader cannot be closed (its `close` method is a no-op).
*/
private[sql] class NonClosableMutableURLClassLoader(parent: ClassLoader)
extends MutableURLClassLoader(Array.empty, parent) {

override def close(): Unit = {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFor
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext

class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
Expand Down Expand Up @@ -83,91 +84,100 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}

test("Create/Drop Database") {
val catalog = sqlContext.sessionState.catalog
withSQLConf(
SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir") + File.separator)) {
val catalog = sqlContext.sessionState.catalog

val databaseNames = Seq("db1", "`database`")
val databaseNames = Seq("db1", "`database`")

databaseNames.foreach { dbName =>
try {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)

sql(s"CREATE DATABASE $dbName")
val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
assert(db1 == CatalogDatabase(
dbNameWithoutBackTicks,
"",
System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db",
Map.empty))
sql(s"DROP DATABASE $dbName CASCADE")
assert(!catalog.databaseExists(dbNameWithoutBackTicks))
} finally {
catalog.reset()
databaseNames.foreach { dbName =>
try {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)

sql(s"CREATE DATABASE $dbName")
val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
assert(db1 == CatalogDatabase(
dbNameWithoutBackTicks,
"",
System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db",
Map.empty))
sql(s"DROP DATABASE $dbName CASCADE")
assert(!catalog.databaseExists(dbNameWithoutBackTicks))
} finally {
catalog.reset()
}
}
}
}

test("Create Database - database already exists") {
val catalog = sqlContext.sessionState.catalog
val databaseNames = Seq("db1", "`database`")

databaseNames.foreach { dbName =>
try {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)
sql(s"CREATE DATABASE $dbName")
val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
assert(db1 == CatalogDatabase(
dbNameWithoutBackTicks,
"",
System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db",
Map.empty))

val message = intercept[AnalysisException] {
withSQLConf(
SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir") + File.separator)) {
val catalog = sqlContext.sessionState.catalog
val databaseNames = Seq("db1", "`database`")

databaseNames.foreach { dbName =>
try {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)
sql(s"CREATE DATABASE $dbName")
}.getMessage
assert(message.contains(s"Database '$dbNameWithoutBackTicks' already exists."))
} finally {
catalog.reset()
val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
assert(db1 == CatalogDatabase(
dbNameWithoutBackTicks,
"",
System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db",
Map.empty))

val message = intercept[AnalysisException] {
sql(s"CREATE DATABASE $dbName")
}.getMessage
assert(message.contains(s"Database '$dbNameWithoutBackTicks' already exists."))
} finally {
catalog.reset()
}
}
}
}

test("Alter/Describe Database") {
val catalog = sqlContext.sessionState.catalog
val databaseNames = Seq("db1", "`database`")

databaseNames.foreach { dbName =>
try {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)
val location =
System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db"
sql(s"CREATE DATABASE $dbName")

checkAnswer(
sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
Row("Database Name", dbNameWithoutBackTicks) ::
Row("Description", "") ::
Row("Location", location) ::
Row("Properties", "") :: Nil)

sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')")

checkAnswer(
sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
Row("Database Name", dbNameWithoutBackTicks) ::
Row("Description", "") ::
Row("Location", location) ::
Row("Properties", "((a,a), (b,b), (c,c))") :: Nil)

sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')")
withSQLConf(
SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir") + File.separator)) {
val catalog = sqlContext.sessionState.catalog
val databaseNames = Seq("db1", "`database`")

databaseNames.foreach { dbName =>
try {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)
val location =
System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db"
sql(s"CREATE DATABASE $dbName")

checkAnswer(
sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
Row("Database Name", dbNameWithoutBackTicks) ::
Row("Description", "") ::
Row("Location", location) ::
Row("Properties", "((a,a), (b,b), (c,c), (d,d))") :: Nil)
} finally {
catalog.reset()
checkAnswer(
sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
Row("Database Name", dbNameWithoutBackTicks) ::
Row("Description", "") ::
Row("Location", location) ::
Row("Properties", "") :: Nil)

sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')")

checkAnswer(
sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
Row("Database Name", dbNameWithoutBackTicks) ::
Row("Description", "") ::
Row("Location", location) ::
Row("Properties", "((a,a), (b,b), (c,c))") :: Nil)

sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')")

checkAnswer(
sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
Row("Database Name", dbNameWithoutBackTicks) ::
Row("Description", "") ::
Row("Location", location) ::
Row("Properties", "((a,a), (b,b), (c,c), (d,d))") :: Nil)
} finally {
catalog.reset()
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hive.service.server;

import java.util.Properties;

import org.apache.commons.cli.*;

public class HiveServerServerOptionsProcessor {
private final Options options = new Options();
private org.apache.commons.cli.CommandLine commandLine;
private final String serverName;
private final StringBuilder debugMessage = new StringBuilder();

@SuppressWarnings("static-access")
public HiveServerServerOptionsProcessor(String serverName) {
this.serverName = serverName;
// -hiveconf x=y
options.addOption(OptionBuilder
.withValueSeparator()
.hasArgs(2)
.withArgName("property=value")
.withLongOpt("hiveconf")
.withDescription("Use value for given property")
.create());
// -deregister <versionNumber>
options.addOption(OptionBuilder
.hasArgs(1)
.withArgName("versionNumber")
.withLongOpt("deregister")
.withDescription("Deregister all instances of given version from dynamic service discovery")
.create());
options.addOption(new Option("H", "help", false, "Print help information"));
}

public HiveServer2.ServerOptionsProcessorResponse parse(String[] argv) {
try {
commandLine = new GnuParser().parse(options, argv);
// Process --hiveconf
// Get hiveconf param values and set the System property values
Properties confProps = commandLine.getOptionProperties("hiveconf");
for (String propKey : confProps.stringPropertyNames()) {
// save logging message for log4j output latter after log4j initialize properly
debugMessage.append("Setting " + propKey + "=" + confProps.getProperty(propKey) + ";\n");
// System.setProperty("hivecli." + propKey, confProps.getProperty(propKey));
System.setProperty(propKey, confProps.getProperty(propKey));
}

// Process --help
if (commandLine.hasOption('H')) {
return new HiveServer2.ServerOptionsProcessorResponse(
new HiveServer2.HelpOptionExecutor(serverName, options));
}

// Process --deregister
if (commandLine.hasOption("deregister")) {
return new HiveServer2.ServerOptionsProcessorResponse(
new HiveServer2.DeregisterOptionExecutor(
commandLine.getOptionValue("deregister")));
}
} catch (ParseException e) {
// Error out & exit - we were not able to parse the args successfully
System.err.println("Error starting HiveServer2 with given arguments: ");
System.err.println(e.getMessage());
System.exit(-1);
}
// Default executor, when no option is specified
return new HiveServer2.ServerOptionsProcessorResponse(new HiveServer2.StartOptionExecutor());
}
}
Loading