Skip to content

Commit

Permalink
[SPARK-49534][CORE] No longer prepend sql/hiveand `sql/hive-thrifts…
Browse files Browse the repository at this point in the history
…erver` when `spark-hive_xxx.jar` is not in the classpath

### What changes were proposed in this pull request?
This pr adds two new check condition sto the `launcher.AbstractCommandBuilder#buildClassPath` method:

When `SPARK_PREPEND_CLASSES` is true, it no longer prepending the class path of the `sql/hive` module when `spark-hive_xxx.jar` is not in the classpath. The assumption here is that if `spark-hive_xxx.jar` is not in the classpath, then the `-Phive` profile was not used during package, and therefore the Hive-related jars(such as hive-exec-xx.jar) should also not be in the classpath. To avoid failure in loading the SPI in `DataSourceRegister` under `sql/hive`, so no longer prepend `sql/hive`.

Meanwhile, due to the strong dependency of `sql/hive-thriftserver` on `sql/hive`, the prepend for `sql/hive-thriftserver` will also be excluded if `spark-hive_xxx.jar` is not in the classpath. On the other hand, if `spark-hive-thriftserver_xxx.jar` is not in the classpath, then the `-Phive-thriftserver` profile was not used during package, and therefore, jars such as hive-cli and hive-beeline should also not be included in the classpath. To avoid the inelegant startup failures of tools such as spark-sql, in this scenario, `sql/hive-thriftserver` will no longer be prepended to the classpath.

### Why are the changes needed?
To fix some bad cases during development,  one of them is as follows:

```
build/sbt clean package
export SPARK_PREPEND_CLASSES=true
bin/spark-shell
```

```
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 4.0.0-SNAPSHOT
      /_/

Using Scala version 2.13.14 (OpenJDK 64-Bit Server VM, Java 17.0.12)
Type in expressions to have them evaluated.
Type :help for more information.
24/09/06 17:27:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://172.22.200.248:4040
Spark context available as 'sc' (master = local[*], app id = local-1725614875132).
Spark session available as 'spark'.

scala> spark.sql("CREATE TABLE test_table (id BIGINT) USING parquet")
java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: org.apache.spark.sql.hive.execution.HiveFileFormat Unable to get public no-arg constructor
  at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:586)
  at java.base/java.util.ServiceLoader.getConstructor(ServiceLoader.java:679)
  at java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1240)
  at java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1273)
  at java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1309)
  at java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1393)
  at scala.collection.convert.JavaCollectionWrappers$JIteratorWrapper.hasNext(JavaCollectionWrappers.scala:46)
  at scala.collection.StrictOptimizedIterableOps.filterImpl(StrictOptimizedIterableOps.scala:225)
  at scala.collection.StrictOptimizedIterableOps.filterImpl$(StrictOptimizedIterableOps.scala:222)
  at scala.collection.convert.JavaCollectionWrappers$JIterableWrapper.filterImpl(JavaCollectionWrappers.scala:83)
  at scala.collection.StrictOptimizedIterableOps.filter(StrictOptimizedIterableOps.scala:218)
  at scala.collection.StrictOptimizedIterableOps.filter$(StrictOptimizedIterableOps.scala:218)
  at scala.collection.convert.JavaCollectionWrappers$JIterableWrapper.filter(JavaCollectionWrappers.scala:83)
  at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
  at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:727)
  at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableProvider(DataSourceV2Utils.scala:163)
  at org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog.org$apache$spark$sql$catalyst$analysis$ResolveSessionCatalog$$isV2Provider(ResolveSessionCatalog.scala:666)
  at org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog$$anonfun$apply$1.applyOrElse(ResolveSessionCatalog.scala:172)
  at org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog$$anonfun$apply$1.applyOrElse(ResolveSessionCatalog.scala:54)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:386)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:37)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:111)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:110)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:37)
  at org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog.apply(ResolveSessionCatalog.scala:54)
  at org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog.apply(ResolveSessionCatalog.scala:48)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:226)
  at scala.collection.LinearSeqOps.foldLeft(LinearSeq.scala:183)
  at scala.collection.LinearSeqOps.foldLeft$(LinearSeq.scala:179)
  at scala.collection.immutable.List.foldLeft(List.scala:79)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:223)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:215)
  at scala.collection.immutable.List.foreach(List.scala:334)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:215)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:234)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:230)
  at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:186)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:230)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:201)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:186)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:186)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:222)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:393)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:221)
  at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:92)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
  at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:234)
  at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:608)
  at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:234)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:742)
  at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:233)
  at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:92)
  at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:89)
  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:73)
  at org.apache.spark.sql.Dataset$.$anonfun$ofRows$3(Dataset.scala:120)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:742)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:117)
  at org.apache.spark.sql.SparkSession.$anonfun$sql$4(SparkSession.scala:562)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:742)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:553)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:568)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:578)
  ... 42 elided
Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/hive/ql/plan/FileSinkDesc
  at java.base/java.lang.Class.getDeclaredConstructors0(Native Method)
  at java.base/java.lang.Class.privateGetDeclaredConstructors(Class.java:3373)
  at java.base/java.lang.Class.getConstructor0(Class.java:3578)
  at java.base/java.lang.Class.getConstructor(Class.java:2271)
  at java.base/java.util.ServiceLoader$1.run(ServiceLoader.java:666)
  at java.base/java.util.ServiceLoader$1.run(ServiceLoader.java:663)
  at java.base/java.security.AccessController.doPrivileged(AccessController.java:569)
  at java.base/java.util.ServiceLoader.getConstructor(ServiceLoader.java:674)
  ... 108 more
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.plan.FileSinkDesc
  at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
  at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
  at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
  ... 116 more
```

The aforementioned error is due to the fact that after #40848, the initialization of the SPI `org.apache.spark.sql.hive.execution.HiveFileFormat` within the `sql/hive` module requires `org.apache.hadoop.hive.ql.plan.FileSinkDesc`, but in the current scenario, the relevant jars are not present in the classpath. Therefore, the current pr opts to not prepend the classpath of `sql/hive` in this specific scenario.

Another one is as follows:

```
build/sbt clean package -Phive // or build/sbt clean package
export SPARK_PREPEND_CLASSES=true
bin/spark-sql
```

```
bin/spark-sql
NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark classes ahead of assembly.
WARNING: Using incubator modules: jdk.incubator.vector
24/09/09 00:28:26 ERROR SparkSubmit: Failed to load org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver
java.lang.NoClassDefFoundError: org/apache/hadoop/hive/cli/CliDriver
	at java.base/java.lang.ClassLoader.defineClass1(Native Method)
	at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1017)
	at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:150)
	at java.base/jdk.internal.loader.BuiltinClassLoader.defineClass(BuiltinClassLoader.java:862)
	at java.base/jdk.internal.loader.BuiltinClassLoader.findClassOnClassPathOrNull(BuiltinClassLoader.java:760)
	at java.base/jdk.internal.loader.BuiltinClassLoader.loadClassOrNull(BuiltinClassLoader.java:681)
	at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:639)
	at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:579)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
	at java.base/java.lang.Class.forName0(Native Method)
	at java.base/java.lang.Class.forName(Class.java:467)
	at org.apache.spark.util.SparkClassUtils.classForName(SparkClassUtils.scala:41)
	at org.apache.spark.util.SparkClassUtils.classForName$(SparkClassUtils.scala:36)
	at org.apache.spark.util.Utils$.classForName(Utils.scala:99)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:992)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:226)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:100)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1136)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1145)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.cli.CliDriver
	at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
	at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
	... 22 more
Failed to load hive class.
You need to build Spark with -Phive and -Phive-thriftserver.
```

The aforementioned failure occurred because, when compiling without the `-Phive` and `-Phive-thriftserver` profiles, the classpath lacked the necessary dependencies related to hive-cli. Therefore, in this scenario, `sql/hive-thriftserver` should not be prepended to the classpath either.

### Does this PR introduce _any_ user-facing change?
No,this is only for developers

### How was this patch tested?
1. Pass GitHub Actions
2. Manually verify that the aforementioned test scenarios.

The first scenario no longer reports errors:

```
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 4.0.0-SNAPSHOT
      /_/

Using Scala version 2.13.14 (OpenJDK 64-Bit Server VM, Java 17.0.12)
Type in expressions to have them evaluated.
Type :help for more information.
24/09/06 17:45:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/09/06 17:45:24 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Spark context Web UI available at http://172.22.200.248:4041
Spark context available as 'sc' (master = local[*], app id = local-1725615924448).
Spark session available as 'spark'.

scala> spark.sql("CREATE TABLE test_table (id BIGINT) USING parquet")
val res0: org.apache.spark.sql.DataFrame = []
```

For the second scenario, although spark-sql will also fail to start, the error message appears to be simpler and clearer:

```
bin/spark-sql
NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark classes ahead of assembly.
WARNING: Using incubator modules: jdk.incubator.vector
Error: Failed to load class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.
Failed to load main class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.
You need to build Spark with -Phive and -Phive-thriftserver.
```

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #48015 from LuciferYang/exclude-sql-hive-prepend.

Lead-authored-by: yangjie01 <yangjie01@baidu.com>
Co-authored-by: YangJie <yangjie01@baidu.com>
Signed-off-by: yangjie01 <yangjie01@baidu.com>
  • Loading branch information
LuciferYang committed Sep 9, 2024
1 parent dc3333b commit 23d22b1
Showing 1 changed file with 40 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ List<String> buildClassPath(String appClassPath) throws IOException {

boolean prependClasses = !isEmpty(getenv("SPARK_PREPEND_CLASSES"));
boolean isTesting = "1".equals(getenv("SPARK_TESTING"));
boolean isTestingSql = "1".equals(getenv("SPARK_SQL_TESTING"));
String jarsDir = findJarsDir(getSparkHome(), getScalaVersion(), !isTesting && !isTestingSql);
if (prependClasses || isTesting) {
String scala = getScalaVersion();
List<String> projects = Arrays.asList(
Expand Down Expand Up @@ -176,6 +178,9 @@ List<String> buildClassPath(String appClassPath) throws IOException {
"NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark classes ahead of " +
"assembly.");
}
boolean shouldPrePendSparkHive = isJarAvailable(jarsDir, "spark-hive_");
boolean shouldPrePendSparkHiveThriftServer =
shouldPrePendSparkHive && isJarAvailable(jarsDir, "spark-hive-thriftserver_");
for (String project : projects) {
// Do not use locally compiled class files for Spark server because it should use shaded
// dependencies.
Expand All @@ -185,6 +190,24 @@ List<String> buildClassPath(String appClassPath) throws IOException {
if (isRemote && "1".equals(getenv("SPARK_SCALA_SHELL")) && project.equals("sql/core")) {
continue;
}
// SPARK-49534: The assumption here is that if `spark-hive_xxx.jar` is not in the
// classpath, then the `-Phive` profile was not used during package, and therefore
// the Hive-related jars should also not be in the classpath. To avoid failure in
// loading the SPI in `DataSourceRegister` under `sql/hive`, no longer prepend `sql/hive`.
if (!shouldPrePendSparkHive && project.equals("sql/hive")) {
continue;
}
// SPARK-49534: Meanwhile, due to the strong dependency of `sql/hive-thriftserver`
// on `sql/hive`, the prepend for `sql/hive-thriftserver` will also be excluded
// if `spark-hive_xxx.jar` is not in the classpath. On the other hand, if
// `spark-hive-thriftserver_xxx.jar` is not in the classpath, then the
// `-Phive-thriftserver` profile was not used during package, and therefore,
// jars such as hive-cli and hive-beeline should also not be included in the classpath.
// To avoid the inelegant startup failures of tools such as spark-sql, in this scenario,
// `sql/hive-thriftserver` will no longer be prepended to the classpath.
if (!shouldPrePendSparkHiveThriftServer && project.equals("sql/hive-thriftserver")) {
continue;
}
addToClassPath(cp, String.format("%s/%s/target/scala-%s/classes", sparkHome, project,
scala));
}
Expand All @@ -205,8 +228,6 @@ List<String> buildClassPath(String appClassPath) throws IOException {
// Add Spark jars to the classpath. For the testing case, we rely on the test code to set and
// propagate the test classpath appropriately. For normal invocation, look for the jars
// directory under SPARK_HOME.
boolean isTestingSql = "1".equals(getenv("SPARK_SQL_TESTING"));
String jarsDir = findJarsDir(getSparkHome(), getScalaVersion(), !isTesting && !isTestingSql);
if (jarsDir != null) {
// Place slf4j-api-* jar first to be robust
for (File f: new File(jarsDir).listFiles()) {
Expand Down Expand Up @@ -265,6 +286,23 @@ private void addToClassPath(Set<String> cp, String entries) {
}
}

/**
* Checks if the spark-hive jar file is available in the specified directory.
*
* @param jarsDir the directory to search for spark-hive jar files
* @return true if a file starting with "spark-hive_" is found, false otherwise
*/
private boolean isJarAvailable(String jarsDir, String jarNamePrefix) {
if (jarsDir != null) {
for (File f : new File(jarsDir).listFiles()) {
if (f.getName().startsWith(jarNamePrefix)) {
return true;
}
}
}
return false;
}

String getScalaVersion() {
String scala = getenv("SPARK_SCALA_VERSION");
if (scala != null) {
Expand Down

0 comments on commit 23d22b1

Please sign in to comment.