diff --git a/python/docs/source/getting_started/index.rst b/python/docs/source/getting_started/index.rst index cf4f7de11dbe3..0f3cea7d6ea58 100644 --- a/python/docs/source/getting_started/index.rst +++ b/python/docs/source/getting_started/index.rst @@ -20,7 +20,10 @@ Getting Started =============== +This page summarizes the basic steps required to setup and get started with PySpark. + .. toctree:: :maxdepth: 2 + installation quickstart diff --git a/python/docs/source/getting_started/installation.rst b/python/docs/source/getting_started/installation.rst new file mode 100644 index 0000000000000..a2de0b2e2c9f4 --- /dev/null +++ b/python/docs/source/getting_started/installation.rst @@ -0,0 +1,114 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +============ +Installation +============ + +Official releases are available from the `Apache Spark website `_. +Alternatively, you can install it via ``pip`` from PyPI. PyPI installation is usually for standalone +locally or as a client to connect to a cluster instead of setting a cluster up. + +This page includes the instructions for installing PySpark by using pip, Conda, downloading manually, and building it from the source. + +Python Version Supported +------------------------ + +Python 3.6 and above. + +Using PyPI +---------- + +PySpark installation using `PyPI `_ + +.. code-block:: bash + + pip install pyspark + +Using Conda +----------- + +Conda is an open-source package management and environment management system which is a part of the `Anaconda `_ distribution. It is both cross-platform and language agnostic. + +Conda can be used to create a virtual environment from terminal as shown below: + +.. code-block:: bash + + conda create -n pyspark_env + +After the virtual environment is created, it should be visible under the list of Conda environments which can be seen using the following command: + +.. code-block:: bash + + conda env list + +The newly created environment can be accessed using the following command: + +.. code-block:: bash + + conda activate pyspark_env + +In Conda version earlier than 4.4, the following command should be used: + +.. code-block:: bash + + source activate pyspark_env + +Refer to `Using PyPI <#using-pypi>`_ to install PySpark in the newly created environment. + +Note that `PySpark at Conda `_ is available but not necessarily synced with PySpark release cycle because it is maintained by the community separately. + +Official Release Channel +------------------------ + +Different flavors of PySpark is available in the `official release channel `_. +Any suitable version can be downloaded and extracted as below: + +.. code-block:: bash + + tar xzvf spark-3.0.0-bin-hadoop2.7.tgz + +Ensure the `SPARK_HOME` environment variable points to the directory where the code has been extracted. +Define `PYTHONPATH` such that it can find the PySpark and Py4J under `SPARK_HOME/python/lib`. +One example of doing this is shown below: + +.. code-block:: bash + + cd spark-3.0.0-bin-hadoop2.7 + export SPARK_HOME=`pwd` + export PYTHONPATH=$(ZIPS=("$SPARK_HOME"/python/lib/*.zip); IFS=:; echo "${ZIPS[*]}"):$PYTHONPATH + +Installing from Source +---------------------- + +To install PySpark from source, refer to `Building Spark `_. + +Refer to `Official Release Channel <#official-release-channel>`_ for steps to define ``PYTHONPATH``. + +Dependencies +------------ +============= ========================= ================ +Package Minimum supported version Note +============= ========================= ================ +`pandas` 0.23.2 Optional for SQL +`NumPy` 1.7 Required for ML +`pyarrow` 0.15.1 Optional for SQL +`Py4J` 0.10.9 Required +============= ========================= ================ + +**Note**: PySpark requires Java 8 or later with ``JAVA_HOME`` properly set. +If using JDK 11, set ``-Dio.netty.tryReflectionSetAccessible=true`` for Arrow related features and refer to `Downloading `_ \ No newline at end of file diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 4516c71bbc514..7d591eeea2b79 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1025,7 +1025,7 @@ class Analyzer( case SessionCatalogAndIdentifier(catalog, ident) => lazy val loaded = CatalogV2Util.loadTable(catalog, ident).map { case v1Table: V1Table => - v1SessionCatalog.getRelation(v1Table.v1Table) + v1SessionCatalog.getRelation(v1Table.v1Table, options) case table => SubqueryAlias( catalog.name +: ident.asMultipartIdentifier, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 6fba3156c3919..e9a02c15f7362 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -43,6 +43,7 @@ import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.Utils object SessionCatalog { @@ -783,7 +784,9 @@ class SessionCatalog( } } - def getRelation(metadata: CatalogTable): LogicalPlan = { + def getRelation( + metadata: CatalogTable, + options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty()): LogicalPlan = { val name = metadata.identifier val db = formatDatabaseName(name.database.getOrElse(currentDb)) val table = formatTableName(name.table) @@ -801,7 +804,7 @@ class SessionCatalog( child = parser.parsePlan(viewText)) SubqueryAlias(multiParts, child) } else { - SubqueryAlias(multiParts, UnresolvedCatalogRelation(metadata)) + SubqueryAlias(multiParts, UnresolvedCatalogRelation(metadata, options)) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 4e63ee7428d72..be09e761272ce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.util.quoteIdentifier import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.CaseInsensitiveStringMap /** @@ -639,7 +640,9 @@ object CatalogTypes { * A placeholder for a table relation, which will be replaced by concrete relation like * `LogicalRelation` or `HiveTableRelation`, during analysis. */ -case class UnresolvedCatalogRelation(tableMeta: CatalogTable) extends LeafNode { +case class UnresolvedCatalogRelation( + tableMeta: CatalogTable, + options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty()) extends LeafNode { assert(tableMeta.identifier.database.isDefined) override lazy val resolved: Boolean = false override def output: Seq[Attribute] = Nil diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 4f38f8276f1a1..caa8ceea0ab91 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -884,8 +884,7 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit castArrayCode(from.asInstanceOf[ArrayType].elementType, array.elementType, ctx) case map: MapType => castMapCode(from.asInstanceOf[MapType], map, ctx) case struct: StructType => castStructCode(from.asInstanceOf[StructType], struct, ctx) - case udt: UserDefinedType[_] - if udt.userClass == from.asInstanceOf[UserDefinedType[_]].userClass => + case udt: UserDefinedType[_] if udt.acceptsType(from) => (c, evPrim, evNull) => code"$evPrim = $c;" case _: UserDefinedType[_] => throw new SparkException(s"Cannot cast $from to $to.") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala index af3a8fe684bb3..aa2610d5f87c8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala @@ -292,13 +292,17 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { // Final aggregate val operators = expressions.map { e => val af = e.aggregateFunction - val naf = patchAggregateFunctionChildren(af) { x => - val condition = if (e.filter.isDefined) { - e.filter.map(distinctAggFilterAttrLookup.get(_)).get - } else { - None + val condition = e.filter.map(distinctAggFilterAttrLookup.get(_)).flatten + val naf = if (af.children.forall(_.foldable)) { + // If aggregateFunction's children are all foldable, we only put the first child in + // distinctAggGroups. So here we only need to rewrite the first child to + // `if (gid = ...) ...` or `if (gid = ... and condition) ...`. + val firstChild = evalWithinGroup(id, af.children.head, condition) + af.withNewChildren(firstChild +: af.children.drop(1)).asInstanceOf[AggregateFunction] + } else { + patchAggregateFunctionChildren(af) { x => + distinctAggChildAttrLookup.get(x).map(evalWithinGroup(id, _, condition)) } - distinctAggChildAttrLookup.get(x).map(evalWithinGroup(id, _, condition)) } (e, e.copy(aggregateFunction = naf, isDistinct = false, filter = None)) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index d90c9901f96ac..6682b0575430a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2132,7 +2132,16 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging val kvs = units.indices.map { i => val u = units(i).getText val v = if (values(i).STRING() != null) { - string(values(i).STRING()) + val value = string(values(i).STRING()) + // SPARK-32840: For invalid cases, e.g. INTERVAL '1 day 2' hour, + // INTERVAL 'interval 1' day, we need to check ahead before they are concatenated with + // units and become valid ones, e.g. '1 day 2 hour'. + // Ideally, we only ensure the value parts don't contain any units here. + if (value.exists(Character.isLetter)) { + throw new ParseException("Can only use numbers in the interval value part for" + + s" multiple unit value pairs interval form, but got invalid value: $value", ctx) + } + value } else { values(i).getText } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index dca421a09da62..2f2b645360ed6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -573,7 +573,7 @@ object SQLConf { " a heavily underestimated result.") .version("2.3.1") .doubleConf - .checkValue(_ > 0, "the value of fileDataSizeFactor must be greater than 0") + .checkValue(_ > 0, "the value of fileCompressionFactor must be greater than 0") .createWithDefault(1.0) val PARQUET_SCHEMA_MERGING_ENABLED = buildConf("spark.sql.parquet.mergeSchema") @@ -2732,6 +2732,18 @@ object SQLConf { .booleanConf .createWithDefault(false) + val LEGACY_EXTRA_OPTIONS_BEHAVIOR = + buildConf("spark.sql.legacy.extraOptionsBehavior.enabled") + .internal() + .doc("When true, the extra options will be ignored for DataFrameReader.table(). If set it " + + "to false, which is the default, Spark will check if the extra options have the same " + + "key, but the value is different with the table serde properties. If the check passes, " + + "the extra options will be merged with the serde properties as the scan options. " + + "Otherwise, an exception will be thrown.") + .version("3.1.0") + .booleanConf + .createWithDefault(false) + val TRUNCATE_TRASH_ENABLED = buildConf("spark.sql.truncate.trash.enabled") .doc("This configuration decides when truncating table, whether data files will be moved " + diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala index 76b1944d22a69..2d202ff0e7954 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala @@ -1344,6 +1344,11 @@ class CastSuite extends CastSuiteBase { } } } + + test("SPARK-32828: cast from a derived user-defined type to a base type") { + val v = Literal.create(Row(1), new ExampleSubTypeUDT()) + checkEvaluation(cast(v, new ExampleBaseTypeUDT), Row(1)) + } } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/TestUDT.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/TestUDT.scala index 1be8ee9dfa92b..04b090d7001d1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/TestUDT.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/TestUDT.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.types +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} @@ -59,3 +61,74 @@ object TestUDT { override def equals(other: Any): Boolean = other.isInstanceOf[MyDenseVectorUDT] } } + +// object and classes to test SPARK-19311 + +// Trait/Interface for base type +sealed trait IExampleBaseType extends Serializable { + def field: Int +} + +// Trait/Interface for derived type +sealed trait IExampleSubType extends IExampleBaseType + +// a base class +class ExampleBaseClass(override val field: Int) extends IExampleBaseType + +// a derived class +class ExampleSubClass(override val field: Int) + extends ExampleBaseClass(field) with IExampleSubType + +// UDT for base class +class ExampleBaseTypeUDT extends UserDefinedType[IExampleBaseType] { + + override def sqlType: StructType = { + StructType(Seq( + StructField("intfield", IntegerType, nullable = false))) + } + + override def serialize(obj: IExampleBaseType): InternalRow = { + val row = new GenericInternalRow(1) + row.setInt(0, obj.field) + row + } + + override def deserialize(datum: Any): IExampleBaseType = { + datum match { + case row: InternalRow => + require(row.numFields == 1, + "ExampleBaseTypeUDT requires row with length == 1") + val field = row.getInt(0) + new ExampleBaseClass(field) + } + } + + override def userClass: Class[IExampleBaseType] = classOf[IExampleBaseType] +} + +// UDT for derived class +private[spark] class ExampleSubTypeUDT extends UserDefinedType[IExampleSubType] { + + override def sqlType: StructType = { + StructType(Seq( + StructField("intfield", IntegerType, nullable = false))) + } + + override def serialize(obj: IExampleSubType): InternalRow = { + val row = new GenericInternalRow(1) + row.setInt(0, obj.field) + row + } + + override def deserialize(datum: Any): IExampleSubType = { + datum match { + case row: InternalRow => + require(row.numFields == 1, + "ExampleSubTypeUDT requires row with length == 1") + val field = row.getInt(0) + new ExampleSubClass(field) + } + } + + override def userClass: Class[IExampleSubType] = classOf[IExampleSubType] +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index ab6b1ff5daccf..ab18a3119c09f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -260,25 +260,22 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { s"To ignore this check, set '${SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key}' to 'true'.") } - val updatedPaths = if (!legacyPathOptionBehavior && paths.length == 1) { - option("path", paths.head) - Seq.empty - } else { - paths - } - DataSource.lookupDataSourceV2(source, sparkSession.sessionState.conf).map { provider => val catalogManager = sparkSession.sessionState.catalogManager val sessionOptions = DataSourceV2Utils.extractSessionConfigs( source = provider, conf = sparkSession.sessionState.conf) - val pathsOption = if (updatedPaths.isEmpty) { - None + + val optionsWithPath = if (paths.isEmpty) { + extraOptions + } else if (paths.length == 1) { + extraOptions + ("path" -> paths.head) } else { val objectMapper = new ObjectMapper() - Some("paths" -> objectMapper.writeValueAsString(updatedPaths.toArray)) + extraOptions + ("paths" -> objectMapper.writeValueAsString(paths.toArray)) } - val finalOptions = sessionOptions ++ extraOptions.originalMap ++ pathsOption + val finalOptions = + sessionOptions.filterKeys(!optionsWithPath.contains(_)) ++ optionsWithPath.originalMap val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava) val (table, catalog, ident) = provider match { case _: SupportsCatalogOptions if userSpecifiedSchema.nonEmpty => @@ -303,20 +300,27 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { sparkSession, DataSourceV2Relation.create(table, catalog, ident, dsOptions)) - case _ => loadV1Source(updatedPaths: _*) + case _ => loadV1Source(paths: _*) } - }.getOrElse(loadV1Source(updatedPaths: _*)) + }.getOrElse(loadV1Source(paths: _*)) } private def loadV1Source(paths: String*) = { + val legacyPathOptionBehavior = sparkSession.sessionState.conf.legacyPathOptionBehavior + val (finalPaths, finalOptions) = if (!legacyPathOptionBehavior && paths.length == 1) { + (Nil, extraOptions + ("path" -> paths.head)) + } else { + (paths, extraOptions) + } + // Code path for data source v1. sparkSession.baseRelationToDataFrame( DataSource.apply( sparkSession, - paths = paths, + paths = finalPaths, userSpecifiedSchema = userSpecifiedSchema, className = source, - options = extraOptions.originalMap).resolveRelation()) + options = finalOptions.originalMap).resolveRelation()) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 25ca186c65f04..bd1997bee53f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -291,8 +291,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { "parameter. Either remove the path option, or call save() without the parameter. " + s"To ignore this check, set '${SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key}' to 'true'.") } - this.extraOptions = this.extraOptions + ("path" -> path) - save() + saveInternal(Some(path)) } /** @@ -300,7 +299,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * * @since 1.4.0 */ - def save(): Unit = { + def save(): Unit = saveInternal(None) + + private def saveInternal(path: Option[String]): Unit = { if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) { throw new AnalysisException("Hive data source can only be used with tables, you can not " + "write files of Hive data source directly.") @@ -313,8 +314,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val provider = maybeV2Provider.get val sessionOptions = DataSourceV2Utils.extractSessionConfigs( provider, df.sparkSession.sessionState.conf) - val options = sessionOptions.filterKeys(!extraOptions.contains(_)) ++ extraOptions.toMap - val dsOptions = new CaseInsensitiveStringMap(options.toMap.asJava) + + val optionsWithPath = if (path.isEmpty) { + extraOptions + } else { + extraOptions + ("path" -> path.get) + } + + val finalOptions = + sessionOptions.filterKeys(!optionsWithPath.contains(_)) ++ optionsWithPath.originalMap + val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava) def getTable: Table = { // For file source, it's expensive to infer schema/partition at each write. Here we pass @@ -350,7 +359,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { // Streaming also uses the data source V2 API. So it may be that the data source // implements v2, but has no v2 implementation for batch writes. In that case, we // fall back to saving as though it's a V1 source. - return saveToV1Source() + return saveToV1Source(path) } } @@ -358,14 +367,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { checkPartitioningMatchesV2Table(table) if (mode == SaveMode.Append) { runCommand(df.sparkSession, "save") { - AppendData.byName(relation, df.logicalPlan, extraOptions.toMap) + AppendData.byName(relation, df.logicalPlan, finalOptions) } } else { // Truncate the table. TableCapabilityCheck will throw a nice exception if this // isn't supported runCommand(df.sparkSession, "save") { OverwriteByExpression.byName( - relation, df.logicalPlan, Literal(true), extraOptions.toMap) + relation, df.logicalPlan, Literal(true), finalOptions) } } @@ -385,7 +394,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { partitioningAsV2, df.queryExecution.analyzed, Map(TableCatalog.PROP_PROVIDER -> source) ++ location, - extraOptions.toMap, + finalOptions, ignoreIfExists = createMode == SaveMode.Ignore) } case _: TableProvider => @@ -397,30 +406,36 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { // Streaming also uses the data source V2 API. So it may be that the data source // implements v2, but has no v2 implementation for batch writes. In that case, we // fallback to saving as though it's a V1 source. - saveToV1Source() + saveToV1Source(path) } } } } else { - saveToV1Source() + saveToV1Source(path) } } - private def saveToV1Source(): Unit = { + private def saveToV1Source(path: Option[String]): Unit = { partitioningColumns.foreach { columns => extraOptions = extraOptions + ( DataSourceUtils.PARTITIONING_COLUMNS_KEY -> DataSourceUtils.encodePartitioningColumns(columns)) } + val optionsWithPath = if (path.isEmpty) { + extraOptions + } else { + extraOptions + ("path" -> path.get) + } + // Code path for data source v1. runCommand(df.sparkSession, "save") { DataSource( sparkSession = df.sparkSession, className = source, partitionColumns = partitioningColumns.getOrElse(Nil), - options = extraOptions.toMap).planForWriting(mode, df.logicalPlan) + options = optionsWithPath.originalMap).planForWriting(mode, df.logicalPlan) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index c240a182d32bb..1f70fde3f7654 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -371,6 +371,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) val step: Long = range.step val numSlices: Int = range.numSlices.getOrElse(sparkContext.defaultParallelism) val numElements: BigInt = range.numElements + val isEmptyRange: Boolean = start == end || (start < end ^ 0 < step) override val output: Seq[Attribute] = range.output @@ -396,7 +397,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) } override def inputRDDs(): Seq[RDD[InternalRow]] = { - val rdd = if (start == end || (start < end ^ 0 < step)) { + val rdd = if (isEmptyRange) { new EmptyRDD[InternalRow](sqlContext.sparkContext) } else { sqlContext.sparkContext.parallelize(0 until numSlices, numSlices).map(i => InternalRow(i)) @@ -562,58 +563,64 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") - sqlContext - .sparkContext - .parallelize(0 until numSlices, numSlices) - .mapPartitionsWithIndex { (i, _) => - val partitionStart = (i * numElements) / numSlices * step + start - val partitionEnd = (((i + 1) * numElements) / numSlices) * step + start - def getSafeMargin(bi: BigInt): Long = - if (bi.isValidLong) { - bi.toLong - } else if (bi > 0) { - Long.MaxValue - } else { - Long.MinValue - } - val safePartitionStart = getSafeMargin(partitionStart) - val safePartitionEnd = getSafeMargin(partitionEnd) - val rowSize = UnsafeRow.calculateBitSetWidthInBytes(1) + LongType.defaultSize - val unsafeRow = UnsafeRow.createFromByteArray(rowSize, 1) - val taskContext = TaskContext.get() - - val iter = new Iterator[InternalRow] { - private[this] var number: Long = safePartitionStart - private[this] var overflow: Boolean = false - private[this] val inputMetrics = taskContext.taskMetrics().inputMetrics - - override def hasNext = - if (!overflow) { - if (step > 0) { - number < safePartitionEnd - } else { - number > safePartitionEnd - } - } else false - - override def next() = { - val ret = number - number += step - if (number < ret ^ step < 0) { - // we have Long.MaxValue + Long.MaxValue < Long.MaxValue - // and Long.MinValue + Long.MinValue > Long.MinValue, so iff the step causes a step - // back, we are pretty sure that we have an overflow. - overflow = true + if (isEmptyRange) { + new EmptyRDD[InternalRow](sqlContext.sparkContext) + } else { + sqlContext + .sparkContext + .parallelize(0 until numSlices, numSlices) + .mapPartitionsWithIndex { (i, _) => + val partitionStart = (i * numElements) / numSlices * step + start + val partitionEnd = (((i + 1) * numElements) / numSlices) * step + start + + def getSafeMargin(bi: BigInt): Long = + if (bi.isValidLong) { + bi.toLong + } else if (bi > 0) { + Long.MaxValue + } else { + Long.MinValue } - numOutputRows += 1 - inputMetrics.incRecordsRead(1) - unsafeRow.setLong(0, ret) - unsafeRow + val safePartitionStart = getSafeMargin(partitionStart) + val safePartitionEnd = getSafeMargin(partitionEnd) + val rowSize = UnsafeRow.calculateBitSetWidthInBytes(1) + LongType.defaultSize + val unsafeRow = UnsafeRow.createFromByteArray(rowSize, 1) + val taskContext = TaskContext.get() + + val iter = new Iterator[InternalRow] { + private[this] var number: Long = safePartitionStart + private[this] var overflow: Boolean = false + private[this] val inputMetrics = taskContext.taskMetrics().inputMetrics + + override def hasNext = + if (!overflow) { + if (step > 0) { + number < safePartitionEnd + } else { + number > safePartitionEnd + } + } else false + + override def next() = { + val ret = number + number += step + if (number < ret ^ step < 0) { + // we have Long.MaxValue + Long.MaxValue < Long.MaxValue + // and Long.MinValue + Long.MinValue > Long.MinValue, so iff the step causes a step + // back, we are pretty sure that we have an overflow. + overflow = true + } + + numOutputRows += 1 + inputMetrics.incRecordsRead(1) + unsafeRow.setLong(0, ret) + unsafeRow + } } + new InterruptibleIterator(taskContext, iter) } - new InterruptibleIterator(taskContext, iter) - } + } } override def simpleString(maxFields: Int): String = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala index fae8de4780102..d76b4b8894783 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala @@ -88,7 +88,9 @@ case class CreateFunctionCommand( } else { // For a permanent, we will store the metadata into underlying external catalog. // This function will be loaded into the FunctionRegistry when a query uses it. - // We do not load it into FunctionRegistry right now. + // We do not load it into FunctionRegistry right now, to avoid loading the resource and + // UDF class immediately, as the Spark application to create the function may not have + // access to the resource and/or UDF class. catalog.createFunction(func, ignoreIfExists) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 3ccff6d89babd..1f8cfee308033 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources import java.util.Locale +import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.hadoop.fs.Path @@ -42,6 +43,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.unsafe.types.UTF8String /** @@ -237,11 +239,12 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast * data source. */ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] { - private def readDataSourceTable(table: CatalogTable): LogicalPlan = { + private def readDataSourceTable( + table: CatalogTable, extraOptions: CaseInsensitiveStringMap): LogicalPlan = { val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table) val catalog = sparkSession.sessionState.catalog + val dsOptions = DataSourceUtils.generateDatasourceOptions(extraOptions, table) catalog.getCachedPlan(qualifiedTableName, () => { - val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_)) val dataSource = DataSource( sparkSession, @@ -251,24 +254,24 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] partitionColumns = table.partitionColumnNames, bucketSpec = table.bucketSpec, className = table.provider.get, - options = table.storage.properties ++ pathOption, + options = dsOptions, catalogTable = Some(table)) LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table) }) } override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta), _, _, _, _) + case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options), _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) => - i.copy(table = readDataSourceTable(tableMeta)) + i.copy(table = readDataSourceTable(tableMeta, options)) - case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta), _, _, _, _) => + case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _), _, _, _, _) => i.copy(table = DDLUtils.readHiveTable(tableMeta)) - case UnresolvedCatalogRelation(tableMeta) if DDLUtils.isDatasourceTable(tableMeta) => - readDataSourceTable(tableMeta) + case UnresolvedCatalogRelation(tableMeta, options) if DDLUtils.isDatasourceTable(tableMeta) => + readDataSourceTable(tableMeta, options) - case UnresolvedCatalogRelation(tableMeta) => + case UnresolvedCatalogRelation(tableMeta, _) => DDLUtils.readHiveTable(tableMeta) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index abb74d8d09ec6..b4308a872bb39 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources import java.util.Locale +import scala.collection.JavaConverters._ + import org.apache.hadoop.fs.Path import org.json4s.NoTypeHints import org.json4s.jackson.Serialization @@ -26,11 +28,13 @@ import org.json4s.jackson.Serialization import org.apache.spark.SparkUpgradeException import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY} import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils} import org.apache.spark.sql.catalyst.util.RebaseDateTime import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.Utils @@ -190,4 +194,34 @@ object DataSourceUtils { case LegacyBehaviorPolicy.LEGACY => RebaseDateTime.rebaseGregorianToJulianMicros case LegacyBehaviorPolicy.CORRECTED => identity[Long] } + + def generateDatasourceOptions( + extraOptions: CaseInsensitiveStringMap, table: CatalogTable): Map[String, String] = { + val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_)) + val options = table.storage.properties ++ pathOption + if (!SQLConf.get.getConf(SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR)) { + // Check the same key with different values + table.storage.properties.foreach { case (k, v) => + if (extraOptions.containsKey(k) && extraOptions.get(k) != v) { + throw new AnalysisException( + s"Fail to resolve data source for the table ${table.identifier} since the table " + + s"serde property has the duplicated key $k with extra options specified for this " + + "scan operation. To fix this, you can rollback to the legacy behavior of ignoring " + + "the extra options by setting the config " + + s"${SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR.key} to `false`, or address the " + + s"conflicts of the same config.") + } + } + // To keep the original key from table properties, here we filter all case insensitive + // duplicate keys out from extra options. + val lowerCasedDuplicatedKeys = + table.storage.properties.keySet.map(_.toLowerCase(Locale.ROOT)) + .intersect(extraOptions.keySet.asScala) + extraOptions.asCaseSensitiveMap().asScala.filterNot { + case (k, _) => lowerCasedDuplicatedKeys.contains(k.toLowerCase(Locale.ROOT)) + }.toMap ++ options + } else { + options + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala index 097ea61f13832..6e59ad07d7168 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala @@ -288,7 +288,8 @@ case class SortMergeJoinExec( RowIterator.fromScala(rightIter), inMemoryThreshold, spillThreshold, - cleanupResources + cleanupResources, + condition.isEmpty ) private[this] val joinRow = new JoinedRow diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 5302357d2bfa0..c22f917d3cf91 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -188,12 +188,20 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * * @since 2.0.0 */ - def load(): DataFrame = { + def load(): DataFrame = loadInternal(None) + + private def loadInternal(path: Option[String]): DataFrame = { if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) { throw new AnalysisException("Hive data source can only be used with tables, you can not " + "read files of Hive data source directly.") } + val optionsWithPath = if (path.isEmpty) { + extraOptions + } else { + extraOptions + ("path" -> path.get) + } + val ds = DataSource.lookupDataSource(source, sparkSession.sqlContext.conf). getConstructor().newInstance() // We need to generate the V1 data source so we can pass it to the V2 relation as a shim. @@ -203,7 +211,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo sparkSession, userSpecifiedSchema = userSpecifiedSchema, className = source, - options = extraOptions.toMap) + options = optionsWithPath.originalMap) val v1Relation = ds match { case _: StreamSourceProvider => Some(StreamingRelation(v1DataSource)) case _ => None @@ -213,8 +221,9 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo case provider: TableProvider if !provider.isInstanceOf[FileDataSourceV2] => val sessionOptions = DataSourceV2Utils.extractSessionConfigs( source = provider, conf = sparkSession.sessionState.conf) - val options = sessionOptions ++ extraOptions.toMap - val dsOptions = new CaseInsensitiveStringMap(options.asJava) + val finalOptions = + sessionOptions.filterKeys(!optionsWithPath.contains(_)) ++ optionsWithPath.originalMap + val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava) val table = DataSourceV2Utils.getTableFromProvider(provider, dsOptions, userSpecifiedSchema) import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ table match { @@ -247,7 +256,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo "parameter. Either remove the path option, or call load() without the parameter. " + s"To ignore this check, set '${SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key}' to 'true'.") } - option("path", path).load() + loadInternal(Some(path)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 51ec1e7b8fea1..682f3b98ec2e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -274,7 +274,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { "path parameter. Either remove the path option, or call start() without the parameter. " + s"To ignore this check, set '${SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key}' to 'true'.") } - option("path", path).start() + startInternal(Some(path)) } /** @@ -292,7 +292,9 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { * @since 2.0.0 */ @throws[TimeoutException] - def start(): StreamingQuery = { + def start(): StreamingQuery = startInternal(None) + + private def startInternal(path: Option[String]): StreamingQuery = { if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) { throw new AnalysisException("Hive data source can only be used with tables, you can not " + "write files of Hive data source directly.") @@ -353,29 +355,36 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { // file source v2 does not support streaming yet. classOf[FileDataSourceV2].isAssignableFrom(cls) + val optionsWithPath = if (path.isEmpty) { + extraOptions + } else { + extraOptions + ("path" -> path.get) + } + val sink = if (classOf[TableProvider].isAssignableFrom(cls) && !useV1Source) { val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider] val sessionOptions = DataSourceV2Utils.extractSessionConfigs( source = provider, conf = df.sparkSession.sessionState.conf) - val options = sessionOptions ++ extraOptions.toMap - val dsOptions = new CaseInsensitiveStringMap(options.asJava) + val finalOptions = + sessionOptions.filterKeys(!optionsWithPath.contains(_)) ++ optionsWithPath.originalMap + val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava) val table = DataSourceV2Utils.getTableFromProvider( provider, dsOptions, userSpecifiedSchema = None) import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ table match { case table: SupportsWrite if table.supports(STREAMING_WRITE) => table - case _ => createV1Sink() + case _ => createV1Sink(optionsWithPath) } } else { - createV1Sink() + createV1Sink(optionsWithPath) } df.sparkSession.sessionState.streamingQueryManager.startQuery( extraOptions.get("queryName"), extraOptions.get("checkpointLocation"), df, - extraOptions.toMap, + optionsWithPath.originalMap, sink, outputMode, useTempCheckpointLocation = source == "console" || source == "noop", @@ -384,11 +393,11 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { } } - private def createV1Sink(): Sink = { + private def createV1Sink(optionsWithPath: CaseInsensitiveMap[String]): Sink = { val ds = DataSource( df.sparkSession, className = source, - options = extraOptions.toMap, + options = optionsWithPath.originalMap, partitionColumns = normalizedParCols.getOrElse(Nil)) ds.createSink(outputMode) } diff --git a/sql/core/src/test/resources/sql-tests/inputs/count.sql b/sql/core/src/test/resources/sql-tests/inputs/count.sql index 9f9ee4a873d4f..203f04c589373 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/count.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/count.sql @@ -25,3 +25,13 @@ SELECT count(a, b), count(b, a), count(testData.*) FROM testData; SELECT count(DISTINCT a, b), count(DISTINCT b, a), count(DISTINCT *), count(DISTINCT testData.*) FROM testData; + +-- distinct count with multiple literals +SELECT count(DISTINCT 3,2); +SELECT count(DISTINCT 2), count(DISTINCT 2,3); +SELECT count(DISTINCT 2), count(DISTINCT 3,2); +SELECT count(DISTINCT a), count(DISTINCT 2,3) FROM testData; +SELECT count(DISTINCT a), count(DISTINCT 3,2) FROM testData; +SELECT count(DISTINCT a), count(DISTINCT 2), count(DISTINCT 2,3) FROM testData; +SELECT count(DISTINCT a), count(DISTINCT 2), count(DISTINCT 3,2) FROM testData; +SELECT count(distinct 0.8), percentile_approx(distinct a, 0.8) FROM testData; diff --git a/sql/core/src/test/resources/sql-tests/inputs/group-by-filter.sql b/sql/core/src/test/resources/sql-tests/inputs/group-by-filter.sql index 24d303621faea..e4193d845f2e2 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/group-by-filter.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/group-by-filter.sql @@ -43,6 +43,14 @@ SELECT SUM(salary), COUNT(DISTINCT id), COUNT(DISTINCT id) FILTER (WHERE hiredat SELECT COUNT(DISTINCT 1) FILTER (WHERE a = 1) FROM testData; SELECT COUNT(DISTINCT id) FILTER (WHERE true) FROM emp; SELECT COUNT(DISTINCT id) FILTER (WHERE false) FROM emp; +SELECT COUNT(DISTINCT 2), COUNT(DISTINCT 2,3) FILTER (WHERE dept_id = 40) FROM emp; +SELECT COUNT(DISTINCT 2), COUNT(DISTINCT 3,2) FILTER (WHERE dept_id = 40) FROM emp; +SELECT COUNT(DISTINCT 2), COUNT(DISTINCT 2,3) FILTER (WHERE dept_id > 0) FROM emp; +SELECT COUNT(DISTINCT 2), COUNT(DISTINCT 3,2) FILTER (WHERE dept_id > 0) FROM emp; +SELECT COUNT(DISTINCT id), COUNT(DISTINCT 2,3) FILTER (WHERE dept_id = 40) FROM emp; +SELECT COUNT(DISTINCT id), COUNT(DISTINCT 3,2) FILTER (WHERE dept_id = 40) FROM emp; +SELECT COUNT(DISTINCT id), COUNT(DISTINCT 2,3) FILTER (WHERE dept_id > 0) FROM emp; +SELECT COUNT(DISTINCT id), COUNT(DISTINCT 3,2) FILTER (WHERE dept_id > 0) FROM emp; -- Aggregate with filter and non-empty GroupBy expressions. SELECT a, COUNT(b) FILTER (WHERE a >= 2) FROM testData GROUP BY a; diff --git a/sql/core/src/test/resources/sql-tests/inputs/interval.sql b/sql/core/src/test/resources/sql-tests/inputs/interval.sql index e925c4508f630..c3e4748e76e3c 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/interval.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/interval.sql @@ -206,3 +206,7 @@ select interval '1.2'; select interval '- 2'; select interval '1 day -'; select interval '1 day 1'; + +select interval '1 day 2' day; +select interval 'interval 1' day; +select interval '-\t 1' day; diff --git a/sql/core/src/test/resources/sql-tests/results/ansi/interval.sql.out b/sql/core/src/test/resources/sql-tests/results/ansi/interval.sql.out index 33d918bbeb94d..d6cf9433a06b7 100644 --- a/sql/core/src/test/resources/sql-tests/results/ansi/interval.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/ansi/interval.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 107 +-- Number of queries: 110 -- !query @@ -1122,3 +1122,39 @@ Cannot parse the INTERVAL value: 1 day 1(line 1, pos 7) == SQL == select interval '1 day 1' -------^^^ + + +-- !query +select interval '1 day 2' day +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException + +Can only use numbers in the interval value part for multiple unit value pairs interval form, but got invalid value: 1 day 2(line 1, pos 16) + +== SQL == +select interval '1 day 2' day +----------------^^^ + + +-- !query +select interval 'interval 1' day +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException + +Can only use numbers in the interval value part for multiple unit value pairs interval form, but got invalid value: interval 1(line 1, pos 16) + +== SQL == +select interval 'interval 1' day +----------------^^^ + + +-- !query +select interval '-\t 1' day +-- !query schema +struct +-- !query output +-1 days diff --git a/sql/core/src/test/resources/sql-tests/results/count.sql.out b/sql/core/src/test/resources/sql-tests/results/count.sql.out index 68a5114bb5859..c0cdd0d697538 100644 --- a/sql/core/src/test/resources/sql-tests/results/count.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/count.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 5 +-- Number of queries: 13 -- !query @@ -53,3 +53,67 @@ FROM testData struct -- !query output 3 3 3 3 + + +-- !query +SELECT count(DISTINCT 3,2) +-- !query schema +struct +-- !query output +1 + + +-- !query +SELECT count(DISTINCT 2), count(DISTINCT 2,3) +-- !query schema +struct +-- !query output +1 1 + + +-- !query +SELECT count(DISTINCT 2), count(DISTINCT 3,2) +-- !query schema +struct +-- !query output +1 1 + + +-- !query +SELECT count(DISTINCT a), count(DISTINCT 2,3) FROM testData +-- !query schema +struct +-- !query output +2 1 + + +-- !query +SELECT count(DISTINCT a), count(DISTINCT 3,2) FROM testData +-- !query schema +struct +-- !query output +2 1 + + +-- !query +SELECT count(DISTINCT a), count(DISTINCT 2), count(DISTINCT 2,3) FROM testData +-- !query schema +struct +-- !query output +2 1 1 + + +-- !query +SELECT count(DISTINCT a), count(DISTINCT 2), count(DISTINCT 3,2) FROM testData +-- !query schema +struct +-- !query output +2 1 1 + + +-- !query +SELECT count(distinct 0.8), percentile_approx(distinct a, 0.8) FROM testData +-- !query schema +struct +-- !query output +1 2 \ No newline at end of file diff --git a/sql/core/src/test/resources/sql-tests/results/group-by-filter.sql.out b/sql/core/src/test/resources/sql-tests/results/group-by-filter.sql.out index c349d9d84c226..89a4da116a6b3 100644 --- a/sql/core/src/test/resources/sql-tests/results/group-by-filter.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/group-by-filter.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 68 +-- Number of queries: 76 -- !query @@ -150,6 +150,70 @@ struct 0 +-- !query +SELECT COUNT(DISTINCT 2), COUNT(DISTINCT 2,3) FILTER (WHERE dept_id = 40) FROM emp +-- !query schema +struct +-- !query output +1 0 + + +-- !query +SELECT COUNT(DISTINCT 2), COUNT(DISTINCT 3,2) FILTER (WHERE dept_id = 40) FROM emp +-- !query schema +struct +-- !query output +1 0 + + +-- !query +SELECT COUNT(DISTINCT 2), COUNT(DISTINCT 2,3) FILTER (WHERE dept_id > 0) FROM emp +-- !query schema +struct 0)):bigint> +-- !query output +1 1 + + +-- !query +SELECT COUNT(DISTINCT 2), COUNT(DISTINCT 3,2) FILTER (WHERE dept_id > 0) FROM emp +-- !query schema +struct 0)):bigint> +-- !query output +1 1 + + +-- !query +SELECT COUNT(DISTINCT id), COUNT(DISTINCT 2,3) FILTER (WHERE dept_id = 40) FROM emp +-- !query schema +struct +-- !query output +8 0 + + +-- !query +SELECT COUNT(DISTINCT id), COUNT(DISTINCT 3,2) FILTER (WHERE dept_id = 40) FROM emp +-- !query schema +struct +-- !query output +8 0 + + +-- !query +SELECT COUNT(DISTINCT id), COUNT(DISTINCT 2,3) FILTER (WHERE dept_id > 0) FROM emp +-- !query schema +struct 0)):bigint> +-- !query output +8 1 + + +-- !query +SELECT COUNT(DISTINCT id), COUNT(DISTINCT 3,2) FILTER (WHERE dept_id > 0) FROM emp +-- !query schema +struct 0)):bigint> +-- !query output +8 1 + + -- !query SELECT a, COUNT(b) FILTER (WHERE a >= 2) FROM testData GROUP BY a -- !query schema diff --git a/sql/core/src/test/resources/sql-tests/results/interval.sql.out b/sql/core/src/test/resources/sql-tests/results/interval.sql.out index 898be09a40318..6b149fd6bb961 100644 --- a/sql/core/src/test/resources/sql-tests/results/interval.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/interval.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 107 +-- Number of queries: 110 -- !query @@ -1110,3 +1110,39 @@ Cannot parse the INTERVAL value: 1 day 1(line 1, pos 7) == SQL == select interval '1 day 1' -------^^^ + + +-- !query +select interval '1 day 2' day +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException + +Can only use numbers in the interval value part for multiple unit value pairs interval form, but got invalid value: 1 day 2(line 1, pos 16) + +== SQL == +select interval '1 day 2' day +----------------^^^ + + +-- !query +select interval 'interval 1' day +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException + +Can only use numbers in the interval value part for multiple unit value pairs interval form, but got invalid value: interval 1(line 1, pos 16) + +== SQL == +select interval 'interval 1' day +----------------^^^ + + +-- !query +select interval '-\t 1' day +-- !query schema +struct +-- !query output +-1 days diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 942cf24a3a873..8755dccb801c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -757,6 +757,14 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan ) } + // LEFT ANTI JOIN without bound condition does not spill + assertNotSpilled(sparkContext, "left anti join") { + checkAnswer( + sql("SELECT * FROM testData LEFT ANTI JOIN testData2 ON key = a WHERE key = 2"), + Nil + ) + } + val expected = new ListBuffer[Row]() expected.append( Row(1, "1", 1, 1), Row(1, "1", 1, 2), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala index 9acb00b7b6d0b..b6ab60a91955d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala @@ -21,8 +21,8 @@ import java.time.{LocalDateTime, ZoneOffset} import java.util.Arrays import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} -import org.apache.spark.sql.catalyst.expressions.{Cast, ExpressionEvalHelper, GenericInternalRow, Literal} +import org.apache.spark.sql.catalyst.CatalystTypeConverters +import org.apache.spark.sql.catalyst.expressions.{Cast, ExpressionEvalHelper, Literal} import org.apache.spark.sql.execution.datasources.parquet.ParquetTest import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSparkSession @@ -33,77 +33,6 @@ private[sql] case class MyLabeledPoint(label: Double, features: TestUDT.MyDenseV def getFeatures: TestUDT.MyDenseVector = features } -// object and classes to test SPARK-19311 - -// Trait/Interface for base type -sealed trait IExampleBaseType extends Serializable { - def field: Int -} - -// Trait/Interface for derived type -sealed trait IExampleSubType extends IExampleBaseType - -// a base class -class ExampleBaseClass(override val field: Int) extends IExampleBaseType - -// a derived class -class ExampleSubClass(override val field: Int) - extends ExampleBaseClass(field) with IExampleSubType - -// UDT for base class -class ExampleBaseTypeUDT extends UserDefinedType[IExampleBaseType] { - - override def sqlType: StructType = { - StructType(Seq( - StructField("intfield", IntegerType, nullable = false))) - } - - override def serialize(obj: IExampleBaseType): InternalRow = { - val row = new GenericInternalRow(1) - row.setInt(0, obj.field) - row - } - - override def deserialize(datum: Any): IExampleBaseType = { - datum match { - case row: InternalRow => - require(row.numFields == 1, - "ExampleBaseTypeUDT requires row with length == 1") - val field = row.getInt(0) - new ExampleBaseClass(field) - } - } - - override def userClass: Class[IExampleBaseType] = classOf[IExampleBaseType] -} - -// UDT for derived class -private[spark] class ExampleSubTypeUDT extends UserDefinedType[IExampleSubType] { - - override def sqlType: StructType = { - StructType(Seq( - StructField("intfield", IntegerType, nullable = false))) - } - - override def serialize(obj: IExampleSubType): InternalRow = { - val row = new GenericInternalRow(1) - row.setInt(0, obj.field) - row - } - - override def deserialize(datum: Any): IExampleSubType = { - datum match { - case row: InternalRow => - require(row.numFields == 1, - "ExampleSubTypeUDT requires row with length == 1") - val field = row.getInt(0) - new ExampleSubClass(field) - } - } - - override def userClass: Class[IExampleSubType] = classOf[IExampleSubType] -} - private[sql] case class FooWithDate(date: LocalDateTime, s: String, i: Int) private[sql] class LocalDateTimeUDT extends UserDefinedType[LocalDateTime] { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index d428b7ebc0e91..ca52e51c87ea7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -994,6 +994,13 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { } } } + + testWithWholeStageCodegenOnAndOff("Change the number of partitions to zero " + + "when a range is empty") { _ => + val range = spark.range(1, 1, 1, 1000) + val numPartitions = range.rdd.getNumPartitions + assert(numPartitions == 0) + } } // Used for unit-testing EnsureRequirements diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index 64b0cb296635a..8d39704c61d4e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -43,11 +43,13 @@ object LastOptions { var mockStreamSourceProvider = mock(classOf[StreamSourceProvider]) var mockStreamSinkProvider = mock(classOf[StreamSinkProvider]) var parameters: Map[String, String] = null + var sinkParameters: Map[String, String] = null var schema: Option[StructType] = null var partitionColumns: Seq[String] = Nil def clear(): Unit = { parameters = null + sinkParameters = null schema = null partitionColumns = null reset(mockStreamSourceProvider) @@ -101,7 +103,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = { - LastOptions.parameters = parameters + LastOptions.sinkParameters = parameters LastOptions.partitionColumns = partitionColumns LastOptions.mockStreamSinkProvider.createSink(spark, parameters, partitionColumns, outputMode) (_: Long, _: DataFrame) => {} @@ -170,20 +172,19 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { LastOptions.clear() - val query = df.writeStream + df.writeStream .format("org.apache.spark.sql.streaming.test") .option("opt1", "5") .options(Map("opt2" -> "4")) .options(map) .option("checkpointLocation", newMetadataDir) .start() + .stop() - assert(LastOptions.parameters("opt1") == "5") - assert(LastOptions.parameters("opt2") == "4") - assert(LastOptions.parameters("opt3") == "3") - assert(LastOptions.parameters.contains("checkpointLocation")) - - query.stop() + assert(LastOptions.sinkParameters("opt1") == "5") + assert(LastOptions.sinkParameters("opt2") == "4") + assert(LastOptions.sinkParameters("opt3") == "3") + assert(LastOptions.sinkParameters.contains("checkpointLocation")) } test("SPARK-32832: later option should override earlier options for load()") { @@ -204,7 +205,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { .load() assert(LastOptions.parameters.isEmpty) - val query = ds.writeStream + ds.writeStream .format("org.apache.spark.sql.streaming.test") .option("checkpointLocation", newMetadataDir) .option("paTh", "1") @@ -213,8 +214,8 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { .option("patH", "4") .option("path", "5") .start() - assert(LastOptions.parameters("path") == "5") - query.stop() + .stop() + assert(LastOptions.sinkParameters("path") == "5") } test("partitioning") { @@ -787,15 +788,30 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { withTempDir { checkpointPath => withSQLConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key -> "true", SQLConf.CHECKPOINT_LOCATION.key -> checkpointPath.getAbsolutePath) { - val query = df.writeStream + df.writeStream .format("org.apache.spark.sql.streaming.test") .option("path", "tmp4") .start("tmp5") + .stop() // The legacy behavior overwrites the path option. - assert(LastOptions.parameters("path") == "tmp5") - query.stop() + assert(LastOptions.sinkParameters("path") == "tmp5") } } } } + + test("SPARK-32853: consecutive load/start calls should be allowed") { + val dfr = spark.readStream.format(classOf[DefaultSource].getName) + var df = dfr.load("1") + df = dfr.load("2") + withTempDir { checkpointPath => + val dfw = df.writeStream + .option("checkpointLocation", checkpointPath.getCanonicalPath) + .format(classOf[DefaultSource].getName) + var query = dfw.start("1") + query.stop() + query = dfw.start("2") + query.stop() + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index c4ca85d6237b2..4e61dba4955af 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression} import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.execution.datasources.DataSourceUtils +import org.apache.spark.sql.execution.datasources.{DataSourceUtils, HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.execution.datasources.noop.NoopDataSource import org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase import org.apache.spark.sql.internal.SQLConf @@ -1190,4 +1190,33 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with verifyLoadFails(df.write.option("path", path).format("parquet").save(path)) verifyLoadFails(df.write.option("path", path).format("parquet").save("")) } + + test("SPARK-32853: consecutive load/save calls should be allowed") { + val dfr = spark.read.format(classOf[FakeSourceOne].getName) + dfr.load("1") + dfr.load("2") + val dfw = spark.range(10).write.format(classOf[DefaultSource].getName) + dfw.save("1") + dfw.save("2") + } + + test("SPARK-32844: DataFrameReader.table take the specified options for V1 relation") { + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + withTable("t") { + sql("CREATE TABLE t(i int, d double) USING parquet OPTIONS ('p1'='v1', 'p2'='v2')") + + val msg = intercept[AnalysisException] { + spark.read.option("P1", "v3").table("t").count() + }.getMessage + assert(msg.contains("duplicated key")) + + val df = spark.read.option("P2", "v2").option("p3", "v3").table("t") + val options = df.queryExecution.analyzed.collectFirst { + case r: LogicalRelation => r.relation.asInstanceOf[HadoopFsRelation].options + }.get + assert(options("p2") == "v2") + assert(options("p3") == "v3") + } + } + } }