Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ jobs:
distribution: temurin
java-version: ${{ matrix.java }}
- name: Install Python 3.8
uses: actions/setup-python@v2
uses: actions/setup-python@v4
# We should install one Python that is higher then 3+ for SQL and Yarn because:
# - SQL component also has Python related tests, for example, IntegratedUDFTestUtils.
# - Yarn has a Python specific test too, for example, YarnClusterSuite.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.util
import scala.collection.JavaConverters._

import org.apache.spark.SparkContext
import org.apache.spark.annotation.Unstable
import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin}
import org.apache.spark.sql.connect.service.SparkConnectService

Expand All @@ -33,7 +32,6 @@ import org.apache.spark.sql.connect.service.SparkConnectService
* implement it as a Driver Plugin. To enable Spark Connect, simply make sure that the appropriate
* JAR is available in the CLASSPATH and the driver plugin is configured to load this class.
*/
@Unstable
class SparkConnectPlugin extends SparkPlugin {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import scala.collection.JavaConverters._

import com.google.common.collect.{Lists, Maps}

import org.apache.spark.annotation.{Since, Unstable}
import org.apache.spark.api.python.{PythonEvalType, SimplePythonFunction}
import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.WriteOperation
Expand All @@ -35,8 +34,6 @@ final case class InvalidCommandInput(
private val cause: Throwable = null)
extends Exception(message, cause)

@Unstable
@Since("3.4.0")
class SparkConnectCommandPlanner(session: SparkSession, command: proto.Command) {

lazy val pythonExec =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import org.apache.spark.sql.connect.planner.DataTypeProtoConverter

/**
* A collection of implicit conversions that create a DSL for constructing connect protos.
*
* All classes in connect/dsl are considered an internal API to Spark Connect and are subject to
* change between minor releases.
*/

package object dsl {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.connect.planner

import scala.collection.JavaConverters._

import org.apache.spark.annotation.{Since, Unstable}
import org.apache.spark.connect.proto
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
Expand All @@ -38,8 +37,6 @@ final case class InvalidPlanInput(
private val cause: Throwable = None.orNull)
extends Exception(message, cause)

@Unstable
@Since("3.4.0")
class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) {

def transform(): LogicalPlan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import io.grpc.protobuf.services.ProtoReflectionService
import io.grpc.stub.StreamObserver

import org.apache.spark.SparkEnv
import org.apache.spark.annotation.{Since, Unstable}
import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.{AnalyzeResponse, Request, Response, SparkConnectServiceGrpc}
import org.apache.spark.internal.Logging
Expand All @@ -44,8 +43,6 @@ import org.apache.spark.sql.execution.ExtendedMode
* @param debug
* delegates debug behavior to the handlers.
*/
@Unstable
@Since("3.4.0")
class SparkConnectService(debug: Boolean)
extends SparkConnectServiceGrpc.SparkConnectServiceImplBase
with Logging {
Expand Down Expand Up @@ -127,18 +124,14 @@ class SparkConnectService(debug: Boolean)
* @param userId
* @param session
*/
@Unstable
@Since("3.4.0")
private[connect] case class SessionHolder(userId: String, session: SparkSession)
case class SessionHolder(userId: String, session: SparkSession)

/**
* Static instance of the SparkConnectService.
*
* Used to start the overall SparkConnect service and provides global state to manage the
* different SparkSession from different users connecting to the cluster.
*/
@Unstable
@Since("3.4.0")
object SparkConnectService {

private val CACHE_SIZE = 100
Expand Down Expand Up @@ -169,7 +162,7 @@ object SparkConnectService {
/**
* Based on the `key` find or create a new SparkSession.
*/
private[connect] def getOrCreateIsolatedSession(key: SessionCacheKey): SessionHolder = {
def getOrCreateIsolatedSession(key: SessionCacheKey): SessionHolder = {
userSessionMapping.get(
key,
() => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import com.google.protobuf.ByteString
import io.grpc.stub.StreamObserver

import org.apache.spark.SparkException
import org.apache.spark.annotation.{Since, Unstable}
import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.{Request, Response}
import org.apache.spark.internal.Logging
Expand All @@ -34,8 +33,6 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, QueryStageExec}
import org.apache.spark.sql.internal.SQLConf

@Unstable
@Since("3.4.0")
class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) extends Logging {

// The maximum batch size in bytes for a single batch of data to be returned via proto.
Expand All @@ -60,7 +57,7 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte
processRows(request.getClientId, rows)
}

private[connect] def processRows(clientId: String, rows: DataFrame): Unit = {
def processRows(clientId: String, rows: DataFrame): Unit = {
val timeZoneId = SQLConf.get.sessionLocalTimeZone

// Only process up to 10MB of data.
Expand Down
20 changes: 20 additions & 0 deletions core/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@
"The <functionName> accepts only arrays of pair structs, but <childExpr> is of <childType>."
]
},
"MAP_ZIP_WITH_DIFF_TYPES" : {
"message" : [
"Input to the <functionName> should have been two maps with compatible key types, but it's [<leftType>, <rightType>]."
]
},
"NON_FOLDABLE_INPUT" : {
"message" : [
"the input <inputName> should be a foldable <inputType> expression; however, got <inputExpr>."
Expand All @@ -215,6 +220,11 @@
"Null typed values cannot be used as arguments of <functionName>."
]
},
"PARAMETER_CONSTRAINT_VIOLATION" : {
"message" : [
"The <leftExprName>(<leftExprValue>) must be <constraint> the <rightExprName>(<rightExprValue>)"
]
},
"RANGE_FRAME_INVALID_TYPE" : {
"message" : [
"The data type <orderSpecType> used in the order specification does not match the data type <valueBoundaryType> which is used in the range frame."
Expand Down Expand Up @@ -270,6 +280,11 @@
"The <exprName> must not be null"
]
},
"UNEXPECTED_RETURN_TYPE" : {
"message" : [
"The <functionName> requires return <expectedType> type, but the actual is <actualType> type."
]
},
"UNEXPECTED_STATIC_METHOD" : {
"message" : [
"cannot find a static method <methodName> that matches the argument types in <className>"
Expand Down Expand Up @@ -949,6 +964,11 @@
"Literal for '<value>' of <type>."
]
},
"MULTIPLE_BUCKET_TRANSFORMS" : {
"message" : [
"Multiple bucket TRANSFORMs."
]
},
"NATURAL_CROSS_JOIN" : {
"message" : [
"NATURAL CROSS JOIN."
Expand Down
6 changes: 6 additions & 0 deletions core/src/test/scala/org/apache/spark/SparkFunSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,12 @@ abstract class SparkFunSuite
checkError(exception, errorClass, sqlState, parameters,
false, Array(context))

protected def checkErrorMatchPVals(
exception: SparkThrowable,
errorClass: String,
parameters: Map[String, String]): Unit =
checkError(exception, errorClass, None, parameters, matchPVals = true)

protected def checkErrorMatchPVals(
exception: SparkThrowable,
errorClass: String,
Expand Down
1 change: 1 addition & 0 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,7 @@ object Unidoc {
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/util/collection")))
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/util/kvstore")))
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/catalyst")))
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/connect")))
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/execution")))
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/internal")))
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/hive")))
Expand Down
10 changes: 7 additions & 3 deletions python/pyspark/pandas/namespace.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def range(


def read_csv(
path: str,
path: Union[str, List[str]],
sep: str = ",",
header: Union[str, int, None] = "infer",
names: Optional[Union[str, List[str]]] = None,
Expand All @@ -234,8 +234,8 @@ def read_csv(

Parameters
----------
path : str
The path string storing the CSV file to be read.
path : str or list
Path(s) of the CSV file(s) to be read.
sep : str, default ‘,’
Delimiter to use. Non empty string.
header : int, default ‘infer’
Expand Down Expand Up @@ -296,6 +296,10 @@ def read_csv(
Examples
--------
>>> ps.read_csv('data.csv') # doctest: +SKIP

Load multiple CSV files as a single DataFrame:

>>> ps.read_csv(['data-01.csv', 'data-02.csv']) # doctest: +SKIP
"""
# For latin-1 encoding is same as iso-8859-1, that's why its mapped to iso-8859-1.
encoding_mapping = {"latin-1": "iso-8859-1"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode}
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.trees.TreePattern.{TIME_WINDOW, TreePattern}
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_DAY
import org.apache.spark.sql.catalyst.util.IntervalUtils
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase}
import org.apache.spark.sql.types._

// scalastyle:off line.size.limit line.contains.tab
Expand Down Expand Up @@ -71,7 +71,8 @@ case class TimeWindow(
startTime: Long) extends UnaryExpression
with ImplicitCastInputTypes
with Unevaluable
with NonSQLExpression {
with NonSQLExpression
with QueryErrorsBase {

//////////////////////////
// SQL Constructors
Expand Down Expand Up @@ -114,18 +115,48 @@ case class TimeWindow(
val dataTypeCheck = super.checkInputDataTypes()
if (dataTypeCheck.isSuccess) {
if (windowDuration <= 0) {
return TypeCheckFailure(s"The window duration ($windowDuration) must be greater than 0.")
return DataTypeMismatch(
errorSubClass = "VALUE_OUT_OF_RANGE",
messageParameters = Map(
"exprName" -> toSQLId("window_duration"),
"valueRange" -> s"(0, ${Long.MaxValue}]",
"currentValue" -> toSQLValue(windowDuration, LongType)
)
)
}
if (slideDuration <= 0) {
return TypeCheckFailure(s"The slide duration ($slideDuration) must be greater than 0.")
return DataTypeMismatch(
errorSubClass = "VALUE_OUT_OF_RANGE",
messageParameters = Map(
"exprName" -> toSQLId("slide_duration"),
"valueRange" -> s"(0, ${Long.MaxValue}]",
"currentValue" -> toSQLValue(slideDuration, LongType)
)
)
}
if (slideDuration > windowDuration) {
return TypeCheckFailure(s"The slide duration ($slideDuration) must be less than or equal" +
s" to the windowDuration ($windowDuration).")
return DataTypeMismatch(
errorSubClass = "PARAMETER_CONSTRAINT_VIOLATION",
messageParameters = Map(
"leftExprName" -> toSQLId("slide_duration"),
"leftExprValue" -> toSQLValue(slideDuration, LongType),
"constraint" -> "<=",
"rightExprName" -> toSQLId("window_duration"),
"rightExprValue" -> toSQLValue(windowDuration, LongType)
)
)
}
if (startTime.abs >= slideDuration) {
return TypeCheckFailure(s"The absolute value of start time ($startTime) must be less " +
s"than the slideDuration ($slideDuration).")
return DataTypeMismatch(
errorSubClass = "PARAMETER_CONSTRAINT_VIOLATION",
messageParameters = Map(
"leftExprName" -> toSQLId("abs(start_time)"),
"leftExprValue" -> toSQLValue(startTime.abs, LongType),
"constraint" -> "<",
"rightExprName" -> toSQLId("slide_duration"),
"rightExprValue" -> toSQLValue(slideDuration, LongType)
)
)
}
}
dataTypeCheck
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import scala.collection.mutable

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion, UnresolvedException}
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
import org.apache.spark.sql.catalyst.expressions.Cast._
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.trees.{BinaryLike, QuaternaryLike, TernaryLike}
import org.apache.spark.sql.catalyst.trees.TreePattern._
Expand Down Expand Up @@ -400,11 +402,25 @@ case class ArraySort(
if (function.dataType == IntegerType) {
TypeCheckResult.TypeCheckSuccess
} else {
TypeCheckResult.TypeCheckFailure("Return type of the given function has to be " +
"IntegerType")
DataTypeMismatch(
errorSubClass = "UNEXPECTED_RETURN_TYPE",
messageParameters = Map(
"functionName" -> toSQLId(function.prettyName),
"expectedType" -> toSQLType(IntegerType),
"actualType" -> toSQLType(function.dataType)
)
)
}
case _ =>
TypeCheckResult.TypeCheckFailure(s"$prettyName only supports array input.")
DataTypeMismatch(
errorSubClass = "UNEXPECTED_INPUT_TYPE",
messageParameters = Map(
"paramIndex" -> "1",
"requiredType" -> toSQLType(ArrayType),
"inputSql" -> toSQLExpr(argument),
"inputType" -> toSQLType(argument.dataType)
)
)
}
case failure => failure
}
Expand Down Expand Up @@ -804,9 +820,13 @@ case class ArrayAggregate(
case TypeCheckResult.TypeCheckSuccess =>
if (!DataType.equalsStructurally(
zero.dataType, merge.dataType, ignoreNullability = true)) {
TypeCheckResult.TypeCheckFailure(
s"argument 3 requires ${zero.dataType.simpleString} type, " +
s"however, '${merge.sql}' is of ${merge.dataType.catalogString} type.")
DataTypeMismatch(
errorSubClass = "UNEXPECTED_INPUT_TYPE",
messageParameters = Map(
"paramIndex" -> "3",
"requiredType" -> toSQLType(zero.dataType),
"inputSql" -> toSQLExpr(merge),
"inputType" -> toSQLType(merge.dataType)))
} else {
TypeCheckResult.TypeCheckSuccess
}
Expand Down Expand Up @@ -1025,9 +1045,14 @@ case class MapZipWith(left: Expression, right: Expression, function: Expression)
if (leftKeyType.sameType(rightKeyType)) {
TypeUtils.checkForOrderingExpr(leftKeyType, prettyName)
} else {
TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " +
s"been two ${MapType.simpleString}s with compatible key types, but the key types are " +
s"[${leftKeyType.catalogString}, ${rightKeyType.catalogString}].")
DataTypeMismatch(
errorSubClass = "MAP_ZIP_WITH_DIFF_TYPES",
messageParameters = Map(
"functionName" -> toSQLId(prettyName),
"leftType" -> toSQLType(leftKeyType),
"rightType" -> toSQLType(rightKeyType)
)
)
}
case failure => failure
}
Expand Down
Loading