Skip to content

Commit

Permalink
[SPARK-50102][SQL][CONNECT] Add shims need for missing public SQL met…
Browse files Browse the repository at this point in the history
…hods

### What changes were proposed in this pull request?
This PR makes the following changes:
- It adds shims for a couple of a number of classes exposed in the (classic) SQL interface:  `BaseRelation` , `ExperimentalMethods`, `ExecutionListenerManager`, `SharedState`, `SessionState`, `SparkConf`, `SparkSessionExtensions`, `QueryExecution`, and `SQLContext`.
- It adds all public methods involving these classes. For classic they will just work like before. For connect they will throw `SparkUnsupportedOperationExceptions` when used.
- It reduces the visibility of a couple of classes added recently: `DataSourceRegistration`, and `UDTFRegistration`.
- I have also reorganized all the shims into a single class.

Please note that this is by no means reflects the final state:
- We intent to support `SQLContext`.
- We intent to support 'SparkSession.executeCommand`.
- We are thinking about removing `ExperimentalMethods`, `SharedState`, `SessionState` from the public interface.
- For `QueryExecution`, and `ExecutionListenerManager` we are considering adding a plan representation similar that is not tied to Catalyst.

### Why are the changes needed?
We are creating a shared Scala (JVM) SQL interface for both Classic and Connect.

### Does this PR introduce _any_ user-facing change?
It adds unusable public methods to the connect interface.

### How was this patch tested?
I have added tests that checks if the connect client throws the proper exceptions.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#48687 from hvanhovell/SPARK-50102.

Authored-by: Herman van Hovell <herman@databricks.com>
Signed-off-by: Herman van Hovell <herman@databricks.com>
  • Loading branch information
hvanhovell committed Nov 10, 2024
1 parent e490dd7 commit 655061a
Show file tree
Hide file tree
Showing 19 changed files with 473 additions and 207 deletions.
58 changes: 58 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -4930,6 +4930,64 @@
},
"sqlState" : "0A000"
},
"UNSUPPORTED_CONNECT_FEATURE" : {
"message" : [
"Feature is not supported in Spark Connect:"
],
"subClass" : {
"DATASET_QUERY_EXECUTION" : {
"message" : [
"Access to the Dataset Query Execution. This is server side developer API."
]
},
"RDD" : {
"message" : [
"Resilient Distributed Datasets (RDDs)."
]
},
"SESSION_BASE_RELATION_TO_DATAFRAME" : {
"message" : [
"Invoking SparkSession 'baseRelationToDataFrame'. This is server side developer API"
]
},
"SESSION_EXECUTE_COMMAND" : {
"message" : [
"Invoking SparkSession 'executeCommand'."
]
},
"SESSION_EXPERIMENTAL_METHODS" : {
"message" : [
"Access to SparkSession Experimental (methods). This is server side developer API"
]
},
"SESSION_LISTENER_MANAGER" : {
"message" : [
"Access to the SparkSession Listener Manager. This is server side developer API"
]
},
"SESSION_SESSION_STATE" : {
"message" : [
"Access to the SparkSession Session State. This is server side developer API."
]
},
"SESSION_SHARED_STATE" : {
"message" : [
"Access to the SparkSession Shared State. This is server side developer API."
]
},
"SESSION_SPARK_CONTEXT" : {
"message" : [
"Access to the SparkContext."
]
},
"SESSION_SQL_CONTEXT" : {
"message" : [
"Access to the SparkSession SQL Context."
]
}
},
"sqlState" : "0A000"
},
"UNSUPPORTED_DATASOURCE_FOR_DIRECT_QUERY" : {
"message" : [
"Unsupported data source type for direct query on files: <dataSourceType>"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.annotation.Stable
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.connect.proto.Parse.ParseFormat
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.connect.ConnectClientUnsupportedErrors
import org.apache.spark.sql.connect.ConnectConversions._
import org.apache.spark.sql.connect.common.DataTypeProtoConverter
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -144,11 +145,11 @@ class DataFrameReader private[sql] (sparkSession: SparkSession) extends api.Data

/** @inheritdoc */
override def json(jsonRDD: JavaRDD[String]): Dataset[Row] =
throwRddNotSupportedException()
throw ConnectClientUnsupportedErrors.rdd()

/** @inheritdoc */
override def json(jsonRDD: RDD[String]): Dataset[Row] =
throwRddNotSupportedException()
throw ConnectClientUnsupportedErrors.rdd()

/** @inheritdoc */
override def csv(path: String): DataFrame = super.csv(path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders._
import org.apache.spark.sql.catalyst.expressions.OrderUtils
import org.apache.spark.sql.connect.ConnectClientUnsupportedErrors
import org.apache.spark.sql.connect.ConnectConversions._
import org.apache.spark.sql.connect.client.SparkResult
import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, StorageLevelProtoConverter}
import org.apache.spark.sql.errors.DataTypeErrors.toSQLId
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.expressions.SparkUserDefinedFunction
import org.apache.spark.sql.functions.{struct, to_json}
import org.apache.spark.sql.internal.{ColumnNodeToProtoConverter, DataFrameWriterImpl, DataFrameWriterV2Impl, MergeIntoWriterImpl, ToScalaUDF, UDFAdaptors, UnresolvedAttribute, UnresolvedRegex}
Expand Down Expand Up @@ -1478,8 +1480,11 @@ class Dataset[T] private[sql] (
super.groupByKey(func, encoder).asInstanceOf[KeyValueGroupedDataset[K, T]]

/** @inheritdoc */
override def rdd: RDD[T] = throwRddNotSupportedException()
override def rdd: RDD[T] = throw ConnectClientUnsupportedErrors.rdd()

/** @inheritdoc */
override def toJavaRDD: JavaRDD[T] = throwRddNotSupportedException()
override def toJavaRDD: JavaRDD[T] = throw ConnectClientUnsupportedErrors.rdd()

override def queryExecution: QueryExecution =
throw ConnectClientUnsupportedErrors.queryExecution()
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import com.google.common.cache.{CacheBuilder, CacheLoader}
import io.grpc.ClientInterceptor
import org.apache.arrow.memory.RootAllocator

import org.apache.spark.SparkContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.annotation.{DeveloperApi, Experimental, Since}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.connect.proto
Expand All @@ -40,15 +40,18 @@ import org.apache.spark.sql.catalog.Catalog
import org.apache.spark.sql.catalyst.{JavaTypeInference, ScalaReflection}
import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, RowEncoder}
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{agnosticEncoderFor, BoxedLongEncoder, UnboundRowEncoder}
import org.apache.spark.sql.connect.ConnectClientUnsupportedErrors
import org.apache.spark.sql.connect.client.{ClassFinder, CloseableIterator, SparkConnectClient, SparkResult}
import org.apache.spark.sql.connect.client.SparkConnectClient.Configuration
import org.apache.spark.sql.connect.client.arrow.ArrowSerializer
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.internal.{CatalogImpl, ConnectRuntimeConfig, SessionCleaner, SqlApiConf}
import org.apache.spark.sql.internal.{CatalogImpl, ConnectRuntimeConfig, SessionCleaner, SessionState, SharedState, SqlApiConf}
import org.apache.spark.sql.internal.ColumnNodeToProtoConverter.{toExpr, toTypedExpr}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.streaming.DataStreamReader
import org.apache.spark.sql.streaming.StreamingQueryManager
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.ExecutionListenerManager
import org.apache.spark.util.ArrayImplicits._

/**
Expand Down Expand Up @@ -93,7 +96,7 @@ class SparkSession private[sql] (

/** @inheritdoc */
override def sparkContext: SparkContext =
throw new UnsupportedOperationException("sparkContext is not supported in Spark Connect.")
throw ConnectClientUnsupportedErrors.sparkContext()

/** @inheritdoc */
val conf: RuntimeConfig = new ConnectRuntimeConfig(client)
Expand Down Expand Up @@ -153,27 +156,58 @@ class SparkSession private[sql] (

/** @inheritdoc */
override def createDataFrame[A <: Product: TypeTag](rdd: RDD[A]): DataFrame =
throwRddNotSupportedException()
throw ConnectClientUnsupportedErrors.rdd()

/** @inheritdoc */
override def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame =
throwRddNotSupportedException()
throw ConnectClientUnsupportedErrors.rdd()

/** @inheritdoc */
override def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): DataFrame =
throwRddNotSupportedException()
throw ConnectClientUnsupportedErrors.rdd()

/** @inheritdoc */
override def createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame =
throwRddNotSupportedException()
throw ConnectClientUnsupportedErrors.rdd()

/** @inheritdoc */
override def createDataFrame(rdd: JavaRDD[_], beanClass: Class[_]): DataFrame =
throwRddNotSupportedException()
throw ConnectClientUnsupportedErrors.rdd()

/** @inheritdoc */
override def createDataset[T: Encoder](data: RDD[T]): Dataset[T] =
throwRddNotSupportedException()
throw ConnectClientUnsupportedErrors.rdd()

/** @inheritdoc */
override def sharedState: SharedState =
throw ConnectClientUnsupportedErrors.sharedState()

/** @inheritdoc */
override def sessionState: SessionState =
throw ConnectClientUnsupportedErrors.sessionState()

/** @inheritdoc */
override def sqlContext: SQLContext =
throw ConnectClientUnsupportedErrors.sqlContext()

/** @inheritdoc */
override def listenerManager: ExecutionListenerManager =
throw ConnectClientUnsupportedErrors.listenerManager()

/** @inheritdoc */
override def experimental: ExperimentalMethods =
throw ConnectClientUnsupportedErrors.experimental()

/** @inheritdoc */
override def baseRelationToDataFrame(baseRelation: BaseRelation): api.Dataset[Row] =
throw ConnectClientUnsupportedErrors.baseRelationToDataFrame()

/** @inheritdoc */
override def executeCommand(
runner: String,
command: String,
options: Map[String, String]): DataFrame =
throw ConnectClientUnsupportedErrors.executeCommand()

/** @inheritdoc */
@Experimental
Expand Down Expand Up @@ -663,6 +697,9 @@ object SparkSession extends api.BaseSparkSessionCompanion with Logging {
/** @inheritdoc */
override def config(map: java.util.Map[String, Any]): this.type = super.config(map)

/** @inheritdoc */
override def config(conf: SparkConf): Builder.this.type = super.config(conf)

/** @inheritdoc */
@deprecated("enableHiveSupport does not work in Spark Connect")
override def enableHiveSupport(): this.type = this
Expand All @@ -675,6 +712,10 @@ object SparkSession extends api.BaseSparkSessionCompanion with Logging {
@deprecated("appName does not work in Spark Connect")
override def appName(name: String): this.type = this

/** @inheritdoc */
@deprecated("withExtensions does not work in Spark Connect")
override def withExtensions(f: SparkSessionExtensions => Unit): this.type = this

private def tryCreateSessionFromClient(): Option[SparkSession] = {
if (client != null && client.isSessionValid) {
Option(new SparkSession(client, planIdGenerator))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.connect

import org.apache.spark.SparkUnsupportedOperationException

private[sql] object ConnectClientUnsupportedErrors {

private def unsupportedFeatureException(
subclass: String): SparkUnsupportedOperationException = {
new SparkUnsupportedOperationException(
"UNSUPPORTED_CONNECT_FEATURE." + subclass,
Map.empty[String, String])
}

def rdd(): SparkUnsupportedOperationException =
unsupportedFeatureException("RDD")

def queryExecution(): SparkUnsupportedOperationException =
unsupportedFeatureException("DATASET_QUERY_EXECUTION")

def executeCommand(): SparkUnsupportedOperationException =
unsupportedFeatureException("SESSION_EXECUTE_COMMAND")

def baseRelationToDataFrame(): SparkUnsupportedOperationException =
unsupportedFeatureException("SESSION_BASE_RELATION_TO_DATAFRAME")

def experimental(): SparkUnsupportedOperationException =
unsupportedFeatureException("SESSION_EXPERIMENTAL_METHODS")

def listenerManager(): SparkUnsupportedOperationException =
unsupportedFeatureException("SESSION_LISTENER_MANAGER")

def sessionState(): SparkUnsupportedOperationException =
unsupportedFeatureException("SESSION_SESSION_STATE")

def sharedState(): SparkUnsupportedOperationException =
unsupportedFeatureException("SESSION_SHARED_STATE")

def sparkContext(): SparkUnsupportedOperationException =
unsupportedFeatureException("SESSION_SPARK_CONTEXT")

def sqlContext(): SparkUnsupportedOperationException =
unsupportedFeatureException("SESSION_SQL_CONTEXT")
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,4 @@ package org.apache.spark

package object sql {
type DataFrame = Dataset[Row]

private[sql] def throwRddNotSupportedException(): Nothing =
throw new UnsupportedOperationException("RDDs are not supported in Spark Connect.")
}
Loading

0 comments on commit 655061a

Please sign in to comment.