diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 7a6c49f9135d0..b2b6a38916eeb 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -14,28 +14,6 @@ on: required: true jobs: - # This is on the top to give the most visibility in case of failures - hadoop-2: - name: Hadoop 2 build - runs-on: ubuntu-20.04 - steps: - - name: Checkout Spark repository - uses: actions/checkout@v2 - - name: Cache Coursier local repository - uses: actions/cache@v2 - with: - path: ~/.cache/coursier - key: hadoop-2-coursier-${{ hashFiles('**/pom.xml', '**/plugins.sbt') }} - restore-keys: | - hadoop-2-coursier- - - name: Install Java 8 - uses: actions/setup-java@v1 - with: - java-version: 1.8 - - name: Build with SBT - run: | - ./build/sbt -Pyarn -Pmesos -Pkubernetes -Phive -Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Phadoop-2.7 compile test:compile - # Build: build Spark and run the tests for specified modules. build: name: "Build modules: ${{ matrix.modules }} ${{ matrix.comment }} (JDK ${{ matrix.java }}, ${{ matrix.hadoop }}, ${{ matrix.hive }})" @@ -45,7 +23,7 @@ jobs: fail-fast: false matrix: java: - - 1.8 + - 8 hadoop: - hadoop3.2 hive: @@ -71,26 +49,26 @@ jobs: include: # Hive tests - modules: hive - java: 1.8 + java: 8 hadoop: hadoop3.2 hive: hive2.3 included-tags: org.apache.spark.tags.SlowHiveTest comment: "- slow tests" - modules: hive - java: 1.8 + java: 8 hadoop: hadoop3.2 hive: hive2.3 excluded-tags: org.apache.spark.tags.SlowHiveTest comment: "- other tests" # SQL tests - modules: sql - java: 1.8 + java: 8 hadoop: hadoop3.2 hive: hive2.3 included-tags: org.apache.spark.tags.ExtendedSQLTest comment: "- slow tests" - modules: sql - java: 1.8 + java: 8 hadoop: hadoop3.2 hive: hive2.3 excluded-tags: org.apache.spark.tags.ExtendedSQLTest @@ -123,16 +101,10 @@ jobs: build/zinc-* build/scala-* build/*.jar + ~/.sbt key: build-${{ hashFiles('**/pom.xml', 'project/build.properties', 'build/mvn', 'build/sbt', 'build/sbt-launch-lib.bash', 'build/spark-build-info') }} restore-keys: | build- - - name: Cache Maven local repository - uses: actions/cache@v2 - with: - path: ~/.m2/repository - key: ${{ matrix.java }}-${{ matrix.hadoop }}-maven-${{ hashFiles('**/pom.xml') }} - restore-keys: | - ${{ matrix.java }}-${{ matrix.hadoop }}-maven- - name: Cache Coursier local repository uses: actions/cache@v2 with: @@ -140,7 +112,7 @@ jobs: key: ${{ matrix.java }}-${{ matrix.hadoop }}-coursier-${{ hashFiles('**/pom.xml', '**/plugins.sbt') }} restore-keys: | ${{ matrix.java }}-${{ matrix.hadoop }}-coursier- - - name: Install JDK ${{ matrix.java }} + - name: Install Java ${{ matrix.java }} uses: actions/setup-java@v1 with: java-version: ${{ matrix.java }} @@ -163,9 +135,7 @@ jobs: run: | # Hive and SQL tests become flaky when running in parallel as it's too intensive. if [[ "$MODULES_TO_TEST" == "hive" ]] || [[ "$MODULES_TO_TEST" == "sql" ]]; then export SERIAL_SBT_TESTS=1; fi - mkdir -p ~/.m2 ./dev/run-tests --parallelism 2 --modules "$MODULES_TO_TEST" --included-tags "$INCLUDED_TAGS" --excluded-tags "$EXCLUDED_TAGS" - rm -rf ~/.m2/repository/org/apache/spark - name: Upload test results to report if: always() uses: actions/upload-artifact@v2 @@ -218,16 +188,10 @@ jobs: build/zinc-* build/scala-* build/*.jar + ~/.sbt key: build-${{ hashFiles('**/pom.xml', 'project/build.properties', 'build/mvn', 'build/sbt', 'build/sbt-launch-lib.bash', 'build/spark-build-info') }} restore-keys: | build- - - name: Cache Maven local repository - uses: actions/cache@v2 - with: - path: ~/.m2/repository - key: pyspark-maven-${{ hashFiles('**/pom.xml') }} - restore-keys: | - pyspark-maven- - name: Cache Coursier local repository uses: actions/cache@v2 with: @@ -250,24 +214,22 @@ jobs: # Run the tests. - name: Run tests run: | - mkdir -p ~/.m2 ./dev/run-tests --parallelism 2 --modules "$MODULES_TO_TEST" - rm -rf ~/.m2/repository/org/apache/spark - name: Upload test results to report if: always() uses: actions/upload-artifact@v2 with: - name: test-results-${{ matrix.modules }}--1.8-hadoop3.2-hive2.3 + name: test-results-${{ matrix.modules }}--8-hadoop3.2-hive2.3 path: "**/target/test-reports/*.xml" - name: Upload unit tests log files if: failure() uses: actions/upload-artifact@v2 with: - name: unit-tests-log-${{ matrix.modules }}--1.8-hadoop3.2-hive2.3 + name: unit-tests-log-${{ matrix.modules }}--8-hadoop3.2-hive2.3 path: "**/target/unit-tests.log" sparkr: - name: Build modules - sparkr + name: "Build modules: sparkr" runs-on: ubuntu-20.04 container: image: dongjoon/apache-spark-github-action-image:20201025 @@ -294,16 +256,10 @@ jobs: build/zinc-* build/scala-* build/*.jar + ~/.sbt key: build-${{ hashFiles('**/pom.xml', 'project/build.properties', 'build/mvn', 'build/sbt', 'build/sbt-launch-lib.bash', 'build/spark-build-info') }} restore-keys: | build- - - name: Cache Maven local repository - uses: actions/cache@v2 - with: - path: ~/.m2/repository - key: sparkr-maven-${{ hashFiles('**/pom.xml') }} - restore-keys: | - sparkr-maven- - name: Cache Coursier local repository uses: actions/cache@v2 with: @@ -313,18 +269,16 @@ jobs: sparkr-coursier- - name: Run tests run: | - mkdir -p ~/.m2 # The followings are also used by `r-lib/actions/setup-r` to avoid # R issues at docker environment export TZ=UTC export _R_CHECK_SYSTEM_CLOCK_=FALSE ./dev/run-tests --parallelism 2 --modules sparkr - rm -rf ~/.m2/repository/org/apache/spark - name: Upload test results to report if: always() uses: actions/upload-artifact@v2 with: - name: test-results-sparkr--1.8-hadoop3.2-hive2.3 + name: test-results-sparkr--8-hadoop3.2-hive2.3 path: "**/target/test-reports/*.xml" # Static analysis, and documentation build @@ -334,17 +288,37 @@ jobs: steps: - name: Checkout Spark repository uses: actions/checkout@v2 + # Cache local repositories. Note that GitHub Actions cache has a 2G limit. + - name: Cache Scala, SBT, Maven and Zinc + uses: actions/cache@v2 + with: + path: | + build/apache-maven-* + build/zinc-* + build/scala-* + build/*.jar + ~/.sbt + key: build-${{ hashFiles('**/pom.xml', 'project/build.properties', 'build/mvn', 'build/sbt', 'build/sbt-launch-lib.bash', 'build/spark-build-info') }} + restore-keys: | + build- + - name: Cache Coursier local repository + uses: actions/cache@v2 + with: + path: ~/.cache/coursier + key: docs-coursier-${{ hashFiles('**/pom.xml', '**/plugins.sbt') }} + restore-keys: | + docs-coursier- - name: Cache Maven local repository uses: actions/cache@v2 with: path: ~/.m2/repository - key: docs-maven-repo-${{ hashFiles('**/pom.xml') }} + key: docs-maven-${{ hashFiles('**/pom.xml') }} restore-keys: | docs-maven- - - name: Install JDK 1.8 + - name: Install Java 8 uses: actions/setup-java@v1 with: - java-version: 1.8 + java-version: 8 - name: Install Python 3.6 uses: actions/setup-python@v2 with: @@ -395,8 +369,8 @@ jobs: cd docs jekyll build - java11: - name: Java 11 build + java-11: + name: Java 11 build with Maven runs-on: ubuntu-20.04 steps: - name: Checkout Spark repository @@ -416,12 +390,12 @@ jobs: run: | export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=1g -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN" export MAVEN_CLI_OPTS="--no-transfer-progress" - mkdir -p ~/.m2 + # It uses Maven's 'install' intentionally, see https://github.com/apache/spark/pull/26414. ./build/mvn $MAVEN_CLI_OPTS -DskipTests -Pyarn -Pmesos -Pkubernetes -Phive -Phive-thriftserver -Phadoop-cloud -Djava.version=11 install rm -rf ~/.m2/repository/org/apache/spark scala-213: - name: Scala 2.13 build + name: Scala 2.13 build with SBT runs-on: ubuntu-20.04 steps: - name: Checkout Spark repository @@ -433,11 +407,32 @@ jobs: key: scala-213-coursier-${{ hashFiles('**/pom.xml', '**/plugins.sbt') }} restore-keys: | scala-213-coursier- - - name: Install Java 11 + - name: Install Java 8 uses: actions/setup-java@v1 with: - java-version: 11 + java-version: 8 - name: Build with SBT run: | ./dev/change-scala-version.sh 2.13 ./build/sbt -Pyarn -Pmesos -Pkubernetes -Phive -Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Djava.version=11 -Pscala-2.13 compile test:compile + + hadoop-2: + name: Hadoop 2 build with SBT + runs-on: ubuntu-20.04 + steps: + - name: Checkout Spark repository + uses: actions/checkout@v2 + - name: Cache Coursier local repository + uses: actions/cache@v2 + with: + path: ~/.cache/coursier + key: hadoop-2-coursier-${{ hashFiles('**/pom.xml', '**/plugins.sbt') }} + restore-keys: | + hadoop-2-coursier- + - name: Install Java 8 + uses: actions/setup-java@v1 + with: + java-version: 8 + - name: Build with SBT + run: | + ./build/sbt -Pyarn -Pmesos -Pkubernetes -Phive -Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Phadoop-2.7 compile test:compile diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 404a6968ea429..b927a6b96b810 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -292,6 +292,7 @@ exportMethods("%<=>%", "floor", "format_number", "format_string", + "from_avro", "from_csv", "from_json", "from_unixtime", @@ -416,6 +417,7 @@ exportMethods("%<=>%", "timestamp_seconds", "toDegrees", "toRadians", + "to_avro", "to_csv", "to_date", "to_json", diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index bcd798a8c31e2..039d28a3a37b6 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -361,6 +361,50 @@ NULL #' } NULL +#' Avro processing functions for Column operations +#' +#' Avro processing functions defined for \code{Column}. +#' +#' @param x Column to compute on. +#' @param jsonFormatSchema character Avro schema in JSON string format +#' @param ... additional argument(s) passed as parser options. +#' @name column_avro_functions +#' @rdname column_avro_functions +#' @family avro functions +#' @note Avro is built-in but external data source module since Spark 2.4. +#' Please deploy the application as per +#' \href{https://spark.apache.org/docs/latest/sql-data-sources-avro.html#deploying}{ +#' the deployment section +#' } of "Apache Avro Data Source Guide". +#' @examples +#' \dontrun{ +#' df <- createDataFrame(iris) +#' schema <- paste( +#' c( +#' '{"type": "record", "namespace": "example.avro", "name": "Iris", "fields": [', +#' '{"type": ["double", "null"], "name": "Sepal_Length"},', +#' '{"type": ["double", "null"], "name": "Sepal_Width"},', +#' '{"type": ["double", "null"], "name": "Petal_Length"},', +#' '{"type": ["double", "null"], "name": "Petal_Width"},', +#' '{"type": ["string", "null"], "name": "Species"}]}' +#' ), +#' collapse="\\n" +#' ) +#' +#' df_serialized <- select( +#' df, +#' alias(to_avro(alias(struct(column("*")), "fields")), "payload") +#' ) +#' +#' df_deserialized <- select( +#' df_serialized, +#' from_avro(df_serialized$payload, schema) +#' ) +#' +#' head(df_deserialized) +#' } +NULL + #' @details #' \code{lit}: A new Column is created to represent the literal value. #' If the parameter is a Column, it is returned unchanged. @@ -4547,3 +4591,60 @@ setMethod("vector_to_array", ) column(jc) }) + +#' @details +#' \code{from_avro} Converts a binary column of Avro format into its corresponding catalyst value. +#' The specified schema must match the read data, otherwise the behavior is undefined: +#' it may fail or return arbitrary result. +#' To deserialize the data with a compatible and evolved schema, the expected Avro schema can be +#' set via the option avroSchema. +#' +#' @rdname column_avro_functions +#' @aliases from_avro from_avro,Column-method +#' @note from_avro since 3.1.0 +setMethod("from_avro", + signature(x = "characterOrColumn"), + function(x, jsonFormatSchema, ...) { + x <- if (is.character(x)) { + column(x) + } else { + x + } + + options <- varargsToStrEnv(...) + jc <- callJStatic( + "org.apache.spark.sql.avro.functions", "from_avro", + x@jc, + jsonFormatSchema, + options + ) + column(jc) + }) + +#' @details +#' \code{to_avro} Converts a column into binary of Avro format. +#' +#' @rdname column_avro_functions +#' @aliases to_avro to_avro,Column-method +#' @note to_avro since 3.1.0 +setMethod("to_avro", + signature(x = "characterOrColumn"), + function(x, jsonFormatSchema = NULL) { + x <- if (is.character(x)) { + column(x) + } else { + x + } + + jc <- if (is.null(jsonFormatSchema)) { + callJStatic("org.apache.spark.sql.avro.functions", "to_avro", x@jc) + } else { + callJStatic( + "org.apache.spark.sql.avro.functions", + "to_avro", + x@jc, + jsonFormatSchema + ) + } + column(jc) + }) diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index e372ae27e315a..1fe6599bf1b97 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -950,7 +950,6 @@ setGeneric("current_date", function(x = "missing") { standardGeneric("current_da #' @name NULL setGeneric("current_timestamp", function(x = "missing") { standardGeneric("current_timestamp") }) - #' @rdname column_datetime_diff_functions #' @name NULL setGeneric("datediff", function(y, x) { standardGeneric("datediff") }) @@ -1015,6 +1014,10 @@ setGeneric("expr", function(x) { standardGeneric("expr") }) #' @name NULL setGeneric("flatten", function(x) { standardGeneric("flatten") }) +#' @rdname column_avro_functions +#' @name NULL +setGeneric("from_avro", function(x, ...) { standardGeneric("from_avro") }) + #' @rdname column_datetime_diff_functions #' @name NULL setGeneric("from_utc_timestamp", function(y, x) { standardGeneric("from_utc_timestamp") }) @@ -1388,6 +1391,10 @@ setGeneric("sumDistinct", function(x) { standardGeneric("sumDistinct") }) #' @name timestamp_seconds setGeneric("timestamp_seconds", function(x) { standardGeneric("timestamp_seconds") }) +#' @rdname column_avro_functions +#' @name NULL +setGeneric("to_avro", function(x, ...) { standardGeneric("to_avro") }) + #' @rdname column_collection_functions #' @name NULL setGeneric("transform_keys", function(x, f) { standardGeneric("transform_keys") }) diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 3a0d359e2ae79..45de1ef1bd3d1 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -1841,6 +1841,32 @@ test_that("column functions", { ) }) +test_that("avro column functions", { + skip_if_not( + grepl("spark-avro", sparkR.conf("spark.jars", "")), + "spark-avro jar not present" + ) + + schema <- '{"namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favorite_color", "type": ["string", "null"]} + ] + }' + + c0 <- column("foo") + c1 <- from_avro(c0, schema) + expect_s4_class(c1, "Column") + c2 <- from_avro("foo", schema) + expect_s4_class(c2, "Column") + c3 <- to_avro(c1) + expect_s4_class(c3, "Column") + c4 <- to_avro(c1, schema) + expect_s4_class(c4, "Column") +}) + test_that("column binary mathfunctions", { lines <- c("{\"a\":1, \"b\":5}", "{\"a\":2, \"b\":6}", diff --git a/R/run-tests.sh b/R/run-tests.sh index 51ca7d600caf0..edc2b2b60b60e 100755 --- a/R/run-tests.sh +++ b/R/run-tests.sh @@ -23,7 +23,18 @@ FAILED=0 LOGFILE=$FWDIR/unit-tests.out rm -f $LOGFILE -SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.defaultFS="file:///" --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE +SPARK_AVRO_JAR_PATH=$(find $FWDIR/../external/avro/ -name "spark-avro*jar" -print | egrep -v "tests.jar|test-sources.jar|sources.jar|javadoc.jar") + +if [[ $(echo $SPARK_AVRO_JAR_PATH | wc -l) -eq 1 ]]; then + SPARK_JARS=$SPARK_AVRO_JAR_PATH +fi + +if [ -z "$SPARK_JARS" ]; then + SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.defaultFS="file:///" --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE +else + SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --jars $SPARK_JARS --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.defaultFS="file:///" --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE +fi + FAILED=$((PIPESTATUS[0]||$FAILED)) NUM_TEST_WARNING="$(grep -c -e 'Warnings ----------------' $LOGFILE)" diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java index 82810dacdad84..9a71cf593e28c 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java @@ -88,12 +88,14 @@ public void processFetchRequest( logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel), msg.streamChunkId); } - long chunksBeingTransferred = streamManager.chunksBeingTransferred(); - if (chunksBeingTransferred >= maxChunksBeingTransferred) { - logger.warn("The number of chunks being transferred {} is above {}, close the connection.", - chunksBeingTransferred, maxChunksBeingTransferred); - channel.close(); - return; + if (maxChunksBeingTransferred < Long.MAX_VALUE) { + long chunksBeingTransferred = streamManager.chunksBeingTransferred(); + if (chunksBeingTransferred >= maxChunksBeingTransferred) { + logger.warn("The number of chunks being transferred {} is above {}, close the connection.", + chunksBeingTransferred, maxChunksBeingTransferred); + channel.close(); + return; + } } ManagedBuffer buf; try { diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java index f178928006902..4a30f8de07827 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java @@ -124,12 +124,14 @@ private void processStreamRequest(final StreamRequest req) { req.streamId); } - long chunksBeingTransferred = streamManager.chunksBeingTransferred(); - if (chunksBeingTransferred >= maxChunksBeingTransferred) { - logger.warn("The number of chunks being transferred {} is above {}, close the connection.", - chunksBeingTransferred, maxChunksBeingTransferred); - channel.close(); - return; + if (maxChunksBeingTransferred < Long.MAX_VALUE) { + long chunksBeingTransferred = streamManager.chunksBeingTransferred(); + if (chunksBeingTransferred >= maxChunksBeingTransferred) { + logger.warn("The number of chunks being transferred {} is above {}, close the connection.", + chunksBeingTransferred, maxChunksBeingTransferred); + channel.close(); + return; + } } ManagedBuffer buf; try { diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala index 4d765481eb836..09fa91655fba5 100644 --- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala +++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala @@ -21,7 +21,6 @@ import java.util.{Properties, Timer, TimerTask} import scala.collection.JavaConverters._ import scala.concurrent.duration._ -import scala.language.postfixOps import scala.util.{Failure, Success => ScalaSuccess, Try} import org.apache.spark.annotation.{Experimental, Since} diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index c3152d9225107..cdec1982b4487 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -35,7 +35,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.io.CompressionCodec import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv} -import org.apache.spark.scheduler.{ExecutorCacheTaskLocation, MapStatus} +import org.apache.spark.scheduler.MapStatus import org.apache.spark.shuffle.MetadataFetchFailedException import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockId} import org.apache.spark.util._ diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d68015454de9d..0440a9de6ab31 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1542,8 +1542,8 @@ class SparkContext(config: SparkConf) extends Logging { val schemeCorrectedURI = uri.getScheme match { case null => new File(path).getCanonicalFile.toURI case "local" => - logWarning("File with 'local' scheme is not supported to add to file server, since " + - "it is already available on every node.") + logWarning(s"File with 'local' scheme $path is not supported to add to file server, " + + s"since it is already available on every node.") return case _ => uri } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index d7a09b599794e..136da80d48dee 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -24,13 +24,8 @@ import java.nio.charset.StandardCharsets.UTF_8 import java.util.concurrent.atomic.AtomicBoolean import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal -import org.json4s.JsonAST._ -import org.json4s.JsonDSL._ -import org.json4s.jackson.JsonMethods.{compact, render} - import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config.{BUFFER_SIZE, EXECUTOR_CORES} @@ -86,6 +81,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( private val conf = SparkEnv.get.conf protected val bufferSize: Int = conf.get(BUFFER_SIZE) private val reuseWorker = conf.get(PYTHON_WORKER_REUSE) + protected val simplifiedTraceback: Boolean = false // All the Python functions should have the same exec, version and envvars. protected val envVars: java.util.Map[String, String] = funcs.head.funcs.head.envVars @@ -133,6 +129,9 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( if (reuseWorker) { envVars.put("SPARK_REUSE_WORKER", "1") } + if (simplifiedTraceback) { + envVars.put("SPARK_SIMPLIFIED_TRACEBACK", "1") + } // SPARK-30299 this could be wrong with standalone mode when executor // cores might not be correct because it defaults to all cores on the box. val execCores = execCoresProp.map(_.toInt).getOrElse(conf.get(EXECUTOR_CORES)) diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala index 5a6fa507963f0..dc2587a62ae40 100644 --- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala @@ -17,8 +17,6 @@ package org.apache.spark.api.python -import java.nio.ByteOrder -import java.nio.charset.StandardCharsets import java.util.{ArrayList => JArrayList} import scala.collection.JavaConverters._ diff --git a/core/src/main/scala/org/apache/spark/api/r/RRunner.scala b/core/src/main/scala/org/apache/spark/api/r/RRunner.scala index 20ab6fc2f348d..41c66024272b9 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RRunner.scala @@ -19,7 +19,6 @@ package org.apache.spark.api.r import java.io._ -import org.apache.spark._ import org.apache.spark.broadcast.Broadcast /** diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala index 17733d99cd5bc..d76fb7f9a20b3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala @@ -22,7 +22,6 @@ import org.json4s.JsonDSL._ import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse} import org.apache.spark.deploy.master._ -import org.apache.spark.deploy.master.RecoveryState.MasterState import org.apache.spark.deploy.worker.ExecutorRunner import org.apache.spark.resource.{ResourceInformation, ResourceRequirement} @@ -208,7 +207,8 @@ private[deploy] object JsonProtocol { * master * `completeddrivers` a list of Json objects of [[DriverInfo]] of the completed drivers * of the master - * `status` status of the master, see [[MasterState]] + * `status` status of the master, + * see [[org.apache.spark.deploy.master.RecoveryState.MasterState]] */ def writeMasterState(obj: MasterStateResponse): JObject = { val aliveWorkers = obj.workers.filter(_.isAlive()) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 9a316e8c5b5a9..4b17661496808 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -391,6 +391,7 @@ private[spark] class SparkSubmit extends Logging { downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr) }.orNull args.files = renameResourcesToLocalFS(args.files, localFiles) + args.pyFiles = renameResourcesToLocalFS(args.pyFiles, localPyFiles) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala b/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala index c659d32d16314..57b05ff245258 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala @@ -19,7 +19,6 @@ package org.apache.spark.deploy.history import scala.collection.mutable -import org.apache.spark.SparkContext import org.apache.spark.deploy.history.EventFilter.FilterStatistics import org.apache.spark.internal.Logging import org.apache.spark.scheduler._ diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index e1b0fc5e45d6e..e5341aff8ce66 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -21,7 +21,7 @@ import java.io.{File, FileNotFoundException, IOException} import java.lang.{Long => JLong} import java.nio.file.Files import java.util.{Date, NoSuchElementException, ServiceLoader} -import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Future, TimeUnit} +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, TimeUnit} import java.util.zip.ZipOutputStream import scala.collection.JavaConverters._ diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala b/core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala index 58714f16e8417..1b8c7ff26e9f5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala @@ -17,7 +17,6 @@ package org.apache.spark.deploy.history -import java.io.IOException import java.util.Collection import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicBoolean diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index a582a5d045855..cccd3da323774 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -22,9 +22,7 @@ import java.util.{Date, Locale} import java.util.concurrent.{ScheduledFuture, TimeUnit} import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import scala.collection.mutable import scala.util.Random -import scala.util.control.NonFatal import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState, SparkHadoopUtil} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index 035f9d379471c..af94bd6d9e0f2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -18,7 +18,6 @@ package org.apache.spark.deploy.master.ui import java.net.{InetAddress, NetworkInterface, SocketException} -import java.util.Locale import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} import org.apache.spark.deploy.DeployMessages.{DecommissionWorkersOnHosts, MasterStateResponse, RequestMasterState} diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index b2bc6b3b68007..6a1fd57873c3a 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -17,7 +17,6 @@ package org.apache.spark.executor -import java.io.File import java.net.URL import java.nio.ByteBuffer import java.util.Locale diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 1a0ad566633da..f7246448959e9 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -22,7 +22,7 @@ import java.lang.Thread.UncaughtExceptionHandler import java.lang.management.ManagementFactory import java.net.{URI, URL} import java.nio.ByteBuffer -import java.util.Properties +import java.util.{Locale, Properties} import java.util.concurrent._ import java.util.concurrent.atomic.AtomicBoolean import javax.annotation.concurrent.GuardedBy @@ -110,7 +110,9 @@ private[spark] class Executor( .build() Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor] } - private val executorSource = new ExecutorSource(threadPool, executorId) + private val schemes = conf.get(EXECUTOR_METRICS_FILESYSTEM_SCHEMES) + .toLowerCase(Locale.ROOT).split(",").map(_.trim).filter(_.nonEmpty) + private val executorSource = new ExecutorSource(threadPool, executorId, schemes) // Pool used for threads that supervise task killing / cancellation private val taskReaperPool = ThreadUtils.newDaemonCachedThreadPool("Task reaper") // For tasks which are in the process of being killed, this map holds the most recently created diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala index 50207aeb3ef6b..d2765d061d662 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala @@ -27,7 +27,10 @@ import org.apache.hadoop.fs.FileSystem import org.apache.spark.metrics.source.Source private[spark] -class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends Source { +class ExecutorSource( + threadPool: ThreadPoolExecutor, + executorId: String, + fileSystemSchemes: Array[String]) extends Source { private def fileStats(scheme: String) : Option[FileSystem.Statistics] = FileSystem.getAllStatistics.asScala.find(s => s.getScheme.equals(scheme)) @@ -70,7 +73,7 @@ class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends }) // Gauge for file system stats of this executor - for (scheme <- Array("hdfs", "file")) { + for (scheme <- fileSystemSchemes) { registerFileSystemStat(scheme, "read_bytes", _.getBytesRead(), 0L) registerFileSystemStat(scheme, "write_bytes", _.getBytesWritten(), 0L) registerFileSystemStat(scheme, "read_ops", _.getReadOps(), 0) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 2bb1290963f87..4bc49514fc5ad 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -271,6 +271,13 @@ package object config { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("0") + private[spark] val EXECUTOR_METRICS_FILESYSTEM_SCHEMES = + ConfigBuilder("spark.executor.metrics.fileSystemSchemes") + .doc("The file system schemes to report in executor metrics.") + .version("3.1.0") + .stringConf + .createWithDefaultString("file,hdfs") + private[spark] val EXECUTOR_JAVA_OPTIONS = ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS) .withPrepended(SparkLauncher.EXECUTOR_DEFAULT_JAVA_OPTIONS) diff --git a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala index 62fbc166167d3..cafb39ea82ad9 100644 --- a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala +++ b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala @@ -22,7 +22,7 @@ import scala.reflect.ClassTag import org.apache.spark.TaskContext import org.apache.spark.network.buffer.ManagedBuffer import org.apache.spark.network.client.StreamCallbackWithID -import org.apache.spark.storage.{BlockId, ShuffleBlockId, StorageLevel} +import org.apache.spark.storage.{BlockId, StorageLevel} private[spark] trait BlockDataManager { diff --git a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala index c7f5a97e35612..635efc3e22628 100644 --- a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala @@ -23,7 +23,6 @@ import scala.concurrent.{Future, Promise} import scala.concurrent.duration.Duration import scala.reflect.ClassTag -import org.apache.spark.internal.Logging import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer, NioManagedBuffer} import org.apache.spark.network.shuffle.{BlockFetchingListener, BlockStoreClient, DownloadFileManager} import org.apache.spark.storage.{BlockId, EncryptedManagedBuffer, StorageLevel} diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index 806fbf52795bc..828849812bbd1 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -19,9 +19,7 @@ package org.apache.spark.network.netty import java.io.IOException import java.nio.ByteBuffer -import java.util import java.util.{HashMap => JHashMap, Map => JMap} -import java.util.concurrent.CompletableFuture import scala.collection.JavaConverters._ import scala.concurrent.{Future, Promise} @@ -35,11 +33,11 @@ import org.apache.spark.ExecutorDeadException import org.apache.spark.internal.config import org.apache.spark.network._ import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} -import org.apache.spark.network.client.{RpcResponseCallback, TransportClient, TransportClientBootstrap, TransportClientFactory} +import org.apache.spark.network.client.{RpcResponseCallback, TransportClientBootstrap} import org.apache.spark.network.crypto.{AuthClientBootstrap, AuthServerBootstrap} import org.apache.spark.network.server._ import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager, OneForOneBlockFetcher, RetryingBlockFetcher} -import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, GetLocalDirsForExecutors, LocalDirsForExecutors, UploadBlock, UploadBlockStream} +import org.apache.spark.network.shuffle.protocol.{UploadBlock, UploadBlockStream} import org.apache.spark.network.util.JavaUtils import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.serializer.JavaSerializer diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala index 324cba5b4de42..f0239cdd9136d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala @@ -19,7 +19,6 @@ package org.apache.spark.rdd import java.io._ -import scala.Serializable import scala.collection.Map import scala.collection.immutable.NumericRange import scala.collection.mutable.ArrayBuffer diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala index 576a83f6ab4d9..5093a12777ad3 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala @@ -20,7 +20,6 @@ package org.apache.spark.rdd import java.io.{FileNotFoundException, IOException} import java.util.concurrent.TimeUnit -import scala.collection.mutable import scala.reflect.ClassTag import scala.util.control.NonFatal diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala b/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala index 482d9e94c6dd9..22d10a975ad0f 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala @@ -20,7 +20,6 @@ package org.apache.spark.resource import scala.collection.mutable import org.apache.spark.SparkException -import org.apache.spark.util.collection.OpenHashMap /** * Trait used to help executor/worker allocate resources. @@ -40,7 +39,7 @@ trait ResourceAllocator { * can be a multiple, such that each address can be allocated up to [[slotsPerAddress]] * times. * - * TODO Use [[OpenHashMap]] instead to gain better performance. + * TODO Use [[org.apache.spark.util.collection.OpenHashMap]] instead to gain better performance. */ private lazy val addressAvailabilityMap = { mutable.HashMap(resourceAddresses.map(_ -> slotsPerAddress): _*) diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala index 5a9435653920f..837b2d80aace6 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala @@ -29,8 +29,8 @@ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.api.resource.ResourceDiscoveryPlugin import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.{CPUS_PER_TASK, EXECUTOR_CORES, RESOURCES_DISCOVERY_PLUGIN, SPARK_TASK_PREFIX} -import org.apache.spark.internal.config.Tests.{RESOURCES_WARNING_TESTING, SKIP_VALIDATE_CORES_TESTING} +import org.apache.spark.internal.config.{EXECUTOR_CORES, RESOURCES_DISCOVERY_PLUGIN, SPARK_TASK_PREFIX} +import org.apache.spark.internal.config.Tests.{RESOURCES_WARNING_TESTING} import org.apache.spark.util.Utils /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 57e219999b0d0..b939e40f3b60c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -26,9 +26,6 @@ import scala.collection.mutable import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, HashSet} import scala.util.Random -import com.google.common.base.Ticker -import com.google.common.cache.CacheBuilder - import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.executor.ExecutorMetrics diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index eda1cb52d4abc..e084453be0789 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -22,7 +22,6 @@ import java.nio.ByteBuffer import org.apache.spark.TaskState.TaskState import org.apache.spark.resource.{ResourceInformation, ResourceProfile} import org.apache.spark.rpc.RpcEndpointRef -import org.apache.spark.scheduler.ExecutorDecommissionInfo import org.apache.spark.scheduler.ExecutorLossReason import org.apache.spark.util.SerializableBuffer diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index 72460180f5908..d9b8eddcf8cd0 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -22,11 +22,9 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import org.apache.spark._ -import org.apache.spark.internal.{config, Logging} -import org.apache.spark.scheduler.MapStatus +import org.apache.spark.internal.Logging import org.apache.spark.shuffle._ -import org.apache.spark.shuffle.api.{ShuffleDataIO, ShuffleExecutorComponents} -import org.apache.spark.util.Utils +import org.apache.spark.shuffle.api.ShuffleExecutorComponents import org.apache.spark.util.collection.OpenHashSet /** diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 5c6543fe28a18..affa85b76cf19 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -22,8 +22,7 @@ import java.util.{List => JList} import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap -import org.apache.spark.{JobExecutionStatus, SparkConf, SparkException} -import org.apache.spark.resource.ResourceProfileManager +import org.apache.spark.{JobExecutionStatus, SparkConf} import org.apache.spark.status.api.v1 import org.apache.spark.ui.scope._ import org.apache.spark.util.Utils diff --git a/core/src/main/scala/org/apache/spark/status/KVUtils.scala b/core/src/main/scala/org/apache/spark/status/KVUtils.scala index 45348be5c98b9..c79f2dcd86533 100644 --- a/core/src/main/scala/org/apache/spark/status/KVUtils.scala +++ b/core/src/main/scala/org/apache/spark/status/KVUtils.scala @@ -21,7 +21,6 @@ import java.io.File import scala.annotation.meta.getter import scala.collection.JavaConverters._ -import scala.language.implicitConversions import scala.reflect.{classTag, ClassTag} import com.fasterxml.jackson.annotation.JsonInclude diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 3909c02c5bb1f..924601f92c5b8 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -55,7 +55,6 @@ import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.serializer.{SerializerInstance, SerializerManager} import org.apache.spark.shuffle.{MigratableResolver, ShuffleManager, ShuffleWriteMetricsReporter} -import org.apache.spark.shuffle.{ShuffleManager, ShuffleWriteMetricsReporter} import org.apache.spark.storage.BlockManagerMessages.{DecommissionBlockManager, ReplicateBlock} import org.apache.spark.storage.memory._ import org.apache.spark.unsafe.Platform diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala index 9699515c626bf..7a55039db1b60 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala @@ -18,7 +18,6 @@ package org.apache.spark.storage import java.io.IOException -import java.util.concurrent.ExecutorService import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ @@ -28,7 +27,7 @@ import scala.util.control.NonFatal import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config -import org.apache.spark.shuffle.{MigratableResolver, ShuffleBlockInfo} +import org.apache.spark.shuffle.ShuffleBlockInfo import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock import org.apache.spark.util.ThreadUtils diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index b8c5cbd121861..a7532a9870fae 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -33,7 +33,7 @@ import org.apache.spark.{MapOutputTrackerMaster, SparkConf} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.{config, Logging} import org.apache.spark.network.shuffle.ExternalBlockStoreClient -import org.apache.spark.rpc.{IsolatedRpcEndpoint, RpcCallContext, RpcEndpointAddress, RpcEndpointRef, RpcEnv} +import org.apache.spark.rpc.{IsolatedRpcEndpoint, RpcCallContext, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.{CoarseGrainedClusterMessages, CoarseGrainedSchedulerBackend} import org.apache.spark.storage.BlockManagerMessages._ diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala index c0a135e04bac5..a3a528cddee37 100644 --- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala @@ -27,7 +27,6 @@ import org.apache.hadoop.fs.viewfs.ViewFileSystem import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.spark._ -import org.apache.spark.annotation.Private import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.HiveCatalogMetrics @@ -45,8 +44,6 @@ private[spark] object HadoopFSUtils extends Logging { * @param paths Input paths to list * @param hadoopConf Hadoop configuration * @param filter Path filter used to exclude leaf files from result - * @param isRootLevel Whether the input paths are at the root level, i.e., they are the root - * paths as opposed to nested paths encountered during recursive calls of this. * @param ignoreMissingFiles Ignore missing files that happen during recursive listing * (e.g., due to race conditions) * @param ignoreLocality Whether to fetch data locality info when listing leaf files. If false, @@ -57,11 +54,22 @@ private[spark] object HadoopFSUtils extends Logging { * @param parallelismMax The maximum parallelism for listing. If the number of input paths is * larger than this value, parallelism will be throttled to this value * to avoid generating too many tasks. - * @param filterFun Optional predicate on the leaf files. Files who failed the check will be - * excluded from the results * @return for each input path, the set of discovered files for the path */ def parallelListLeafFiles( + sc: SparkContext, + paths: Seq[Path], + hadoopConf: Configuration, + filter: PathFilter, + ignoreMissingFiles: Boolean, + ignoreLocality: Boolean, + parallelismThreshold: Int, + parallelismMax: Int): Seq[(Path, Seq[FileStatus])] = { + parallelListLeafFilesInternal(sc, paths, hadoopConf, filter, isRootLevel = true, + ignoreMissingFiles, ignoreLocality, parallelismThreshold, parallelismMax) + } + + private def parallelListLeafFilesInternal( sc: SparkContext, paths: Seq[Path], hadoopConf: Configuration, @@ -70,8 +78,7 @@ private[spark] object HadoopFSUtils extends Logging { ignoreMissingFiles: Boolean, ignoreLocality: Boolean, parallelismThreshold: Int, - parallelismMax: Int, - filterFun: Option[String => Boolean] = None): Seq[(Path, Seq[FileStatus])] = { + parallelismMax: Int): Seq[(Path, Seq[FileStatus])] = { // Short-circuits parallel listing when serial listing is likely to be faster. if (paths.size <= parallelismThreshold) { @@ -85,8 +92,7 @@ private[spark] object HadoopFSUtils extends Logging { ignoreLocality = ignoreLocality, isRootPath = isRootLevel, parallelismThreshold = parallelismThreshold, - parallelismMax = parallelismMax, - filterFun = filterFun) + parallelismMax = parallelismMax) (path, leafFiles) } } @@ -126,58 +132,16 @@ private[spark] object HadoopFSUtils extends Logging { ignoreMissingFiles = ignoreMissingFiles, ignoreLocality = ignoreLocality, isRootPath = isRootLevel, - filterFun = filterFun, parallelismThreshold = Int.MaxValue, parallelismMax = 0) (path, leafFiles) }.iterator - }.map { case (path, statuses) => - val serializableStatuses = statuses.map { status => - // Turn FileStatus into SerializableFileStatus so we can send it back to the driver - val blockLocations = status match { - case f: LocatedFileStatus => - f.getBlockLocations.map { loc => - SerializableBlockLocation( - loc.getNames, - loc.getHosts, - loc.getOffset, - loc.getLength) - } - - case _ => - Array.empty[SerializableBlockLocation] - } - - SerializableFileStatus( - status.getPath.toString, - status.getLen, - status.isDirectory, - status.getReplication, - status.getBlockSize, - status.getModificationTime, - status.getAccessTime, - blockLocations) - } - (path.toString, serializableStatuses) }.collect() } finally { sc.setJobDescription(previousJobDescription) } - // turn SerializableFileStatus back to Status - statusMap.map { case (path, serializableStatuses) => - val statuses = serializableStatuses.map { f => - val blockLocations = f.blockLocations.map { loc => - new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length) - } - new LocatedFileStatus( - new FileStatus( - f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, - new Path(f.path)), - blockLocations) - } - (new Path(path), statuses) - } + statusMap.toSeq } // scalastyle:off argcount @@ -197,7 +161,6 @@ private[spark] object HadoopFSUtils extends Logging { ignoreMissingFiles: Boolean, ignoreLocality: Boolean, isRootPath: Boolean, - filterFun: Option[String => Boolean], parallelismThreshold: Int, parallelismMax: Int): Seq[FileStatus] = { @@ -245,19 +208,11 @@ private[spark] object HadoopFSUtils extends Logging { Array.empty[FileStatus] } - def doFilter(statuses: Array[FileStatus]) = filterFun match { - case Some(shouldFilterOut) => - statuses.filterNot(status => shouldFilterOut(status.getPath.getName)) - case None => - statuses - } - - val filteredStatuses = doFilter(statuses) val allLeafStatuses = { - val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory) + val (dirs, topLevelFiles) = statuses.partition(_.isDirectory) val nestedFiles: Seq[FileStatus] = contextOpt match { case Some(context) if dirs.size > parallelismThreshold => - parallelListLeafFiles( + parallelListLeafFilesInternal( context, dirs.map(_.getPath), hadoopConf = hadoopConf, @@ -265,7 +220,6 @@ private[spark] object HadoopFSUtils extends Logging { isRootLevel = false, ignoreMissingFiles = ignoreMissingFiles, ignoreLocality = ignoreLocality, - filterFun = filterFun, parallelismThreshold = parallelismThreshold, parallelismMax = parallelismMax ).flatMap(_._2) @@ -279,7 +233,6 @@ private[spark] object HadoopFSUtils extends Logging { ignoreMissingFiles = ignoreMissingFiles, ignoreLocality = ignoreLocality, isRootPath = false, - filterFun = filterFun, parallelismThreshold = parallelismThreshold, parallelismMax = parallelismMax) } @@ -289,8 +242,7 @@ private[spark] object HadoopFSUtils extends Logging { } val missingFiles = mutable.ArrayBuffer.empty[String] - val filteredLeafStatuses = doFilter(allLeafStatuses) - val resolvedLeafStatuses = filteredLeafStatuses.flatMap { + val resolvedLeafStatuses = allLeafStatuses.flatMap { case f: LocatedFileStatus => Some(f) @@ -339,22 +291,4 @@ private[spark] object HadoopFSUtils extends Logging { resolvedLeafStatuses } // scalastyle:on argcount - - /** A serializable variant of HDFS's BlockLocation. */ - private case class SerializableBlockLocation( - names: Array[String], - hosts: Array[String], - offset: Long, - length: Long) - - /** A serializable variant of HDFS's FileStatus. */ - private case class SerializableFileStatus( - path: String, - length: Long, - isDir: Boolean, - blockReplication: Short, - blockSize: Long, - modificationTime: Long, - accessTime: Long, - blockLocations: Array[SerializableBlockLocation]) } diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index 78206c51c1028..d45dc937910d9 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -23,7 +23,6 @@ import java.util.concurrent.locks.ReentrantLock import scala.concurrent.{Awaitable, ExecutionContext, ExecutionContextExecutor, Future} import scala.concurrent.duration.{Duration, FiniteDuration} -import scala.language.higherKinds import scala.util.control.NonFatal import com.google.common.util.concurrent.ThreadFactoryBuilder diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 7f1f3a71acab8..b743ab6507117 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -28,7 +28,7 @@ import java.nio.channels.{Channels, FileChannel, WritableByteChannel} import java.nio.charset.StandardCharsets import java.nio.file.Files import java.security.SecureRandom -import java.util.{Arrays, Locale, Properties, Random, UUID} +import java.util.{Locale, Properties, Random, UUID} import java.util.concurrent._ import java.util.concurrent.TimeUnit.NANOSECONDS import java.util.zip.GZIPInputStream diff --git a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala index fae6c4af1240c..e6d3377120e56 100644 --- a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala @@ -18,7 +18,6 @@ package org.apache.spark import scala.concurrent.duration._ -import scala.language.implicitConversions import org.scalatest.concurrent.Eventually._ import org.scalatest.matchers.must.Matchers diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index e1d4eff0a62cb..e47181719a9db 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -21,7 +21,7 @@ import scala.collection.mutable import scala.concurrent.duration._ import org.mockito.ArgumentMatchers.any -import org.mockito.Mockito.{mock, verify, when} +import org.mockito.Mockito.{mock, when} import org.scalatest.{BeforeAndAfterAll, PrivateMethodTester} import org.scalatest.concurrent.Eventually._ diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala index ac39f022d5ca6..7d07af4d7246b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala @@ -23,10 +23,9 @@ import scala.io.{Codec, Source} import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.json4s.jackson.JsonMethods.parse -import org.apache.spark.{SparkConf, SparkFunSuite, Success} +import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.history.EventLogTestHelper.writeEventsToRollingWriter -import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.status.ListenerEventsTestHelper._ diff --git a/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala index 35de457ec48ce..be83ec12f92f5 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala @@ -21,7 +21,6 @@ import java.io.DataOutputStream import java.net.{HttpURLConnection, URL} import java.nio.charset.StandardCharsets import java.util.Date -import javax.servlet.http.HttpServletResponse import scala.collection.mutable.HashMap @@ -32,7 +31,6 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.deploy.DeployMessages.{DecommissionWorkersOnHosts, KillDriverResponse, RequestKillDriver} import org.apache.spark.deploy.DeployTestUtils._ import org.apache.spark.deploy.master._ -import org.apache.spark.internal.config.UI import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv} diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index 8e58beff74290..31049d104e63d 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.executor -import java.io.{Externalizable, File, ObjectInput, ObjectOutput} +import java.io.{Externalizable, ObjectInput, ObjectOutput} import java.lang.Thread.UncaughtExceptionHandler import java.nio.ByteBuffer import java.util.Properties @@ -41,7 +41,6 @@ import org.scalatestplus.mockito.MockitoSugar import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.broadcast.Broadcast -import org.apache.spark.deploy.{SimpleApplicationTest, SparkSubmitSuite} import org.apache.spark.internal.config._ import org.apache.spark.internal.config.UI._ import org.apache.spark.memory.TestMemoryManager @@ -53,7 +52,7 @@ import org.apache.spark.scheduler.{DirectTaskResult, FakeTask, ResultTask, Task, import org.apache.spark.serializer.{JavaSerializer, SerializerInstance, SerializerManager} import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.{BlockManager, BlockManagerId} -import org.apache.spark.util.{LongAccumulator, UninterruptibleThread, Utils} +import org.apache.spark.util.{LongAccumulator, UninterruptibleThread} class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar with Eventually with PrivateMethodTester { diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala index fab7aea6c47aa..f1d7053c34594 100644 --- a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala @@ -29,7 +29,6 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.internal.Logging -import org.apache.spark.util.Utils /** * Tests the correctness of diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala index fa1a75d076051..182c3c09e0524 100644 --- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala @@ -24,7 +24,7 @@ import scala.reflect.ClassTag import scala.util.Random import org.mockito.ArgumentMatchers.any -import org.mockito.Mockito.{mock, times, verify, when} +import org.mockito.Mockito.{mock, when} import org.scalatest.BeforeAndAfterEach import org.scalatest.matchers.must.Matchers import org.scalatest.matchers.should.Matchers._ diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala index 278a72a7192d8..e8e8682e20ed4 100644 --- a/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala @@ -26,10 +26,8 @@ import org.json4s.{DefaultFormats, Extraction} import org.apache.spark.{LocalSparkContext, SparkConf, SparkException, SparkFunSuite} import org.apache.spark.TestUtils._ import org.apache.spark.internal.config._ -import org.apache.spark.internal.config.Tests._ import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.resource.TestResourceIDs._ -import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.util.Utils class ResourceUtilsSuite extends SparkFunSuite diff --git a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala index e4ec62f8efc5b..b7ac9ecac2387 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala @@ -25,7 +25,6 @@ import org.scalatest.concurrent.Eventually import org.scalatest.time.SpanSugar._ import org.apache.spark._ -import org.apache.spark.internal.config import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with Eventually { diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index 47e37fc55cefe..65d51e57ee308 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -188,7 +188,6 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo } test("extra resources from executor") { - import TestUtils._ val execCores = 3 val conf = new SparkConf() diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 99be1faab8b85..58aa246b7358f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -42,7 +42,7 @@ import org.apache.spark.resource.ResourceUtils.{FPGA, GPU} import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.shuffle.{FetchFailedException, MetadataFetchFailedException} import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster} -import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, LongAccumulator, ThreadUtils, Utils} +import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, LongAccumulator, Utils} class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler) extends DAGSchedulerEventProcessLoop(dagScheduler) { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 0c60c42c054cf..b6a59c8bbd944 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -34,7 +34,7 @@ import org.apache.spark.internal.config import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, TaskResourceRequests} import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.resource.TestResourceIDs._ -import org.apache.spark.util.{Clock, ManualClock, SystemClock} +import org.apache.spark.util.{Clock, ManualClock} class FakeSchedulerBackend extends SchedulerBackend { def start(): Unit = {} diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index e01e278f60205..a760dda3897df 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1768,7 +1768,6 @@ class TaskSetManagerSuite } test("TaskSetManager passes task resource along") { - import TestUtils._ sc = new SparkContext("local", "test") sc.conf.set(TASK_GPU_ID.amountConf, "2") diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala index 4a92cbcb85847..1c2326db6dc99 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala @@ -19,14 +19,12 @@ package org.apache.spark.scheduler import java.util.concurrent.Semaphore -import scala.concurrent.TimeoutException import scala.concurrent.duration._ -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite, - TestUtils} +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TestUtils} import org.apache.spark.internal.config import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend -import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils} +import org.apache.spark.util.ThreadUtils class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala index 9c0699bc981f8..d2bf385e10796 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.storage import java.util.Properties -import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.{ExecutionContext, Future} import scala.language.implicitConversions import scala.reflect.ClassTag diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala index 48e0d218c0e5c..d02d7f862df80 100644 --- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala @@ -17,7 +17,6 @@ package org.apache.spark.ui -import java.util.Locale import javax.servlet.http.HttpServletRequest import scala.xml.Node diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 857749e84764d..20624c743bc22 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -18,8 +18,7 @@ package org.apache.spark.util import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataOutput, DataOutputStream, File, - FileOutputStream, InputStream, PrintStream, SequenceInputStream} -import java.lang.{Double => JDouble, Float => JFloat} + FileOutputStream, PrintStream, SequenceInputStream} import java.lang.reflect.Field import java.net.{BindException, ServerSocket, URI} import java.nio.{ByteBuffer, ByteOrder} diff --git a/dev/.rat-excludes b/dev/.rat-excludes index 0e892a927906a..7da330dfe1fbf 100644 --- a/dev/.rat-excludes +++ b/dev/.rat-excludes @@ -42,11 +42,11 @@ jquery.dataTables.1.10.20.min.js jquery.mustache.js jsonFormatter.min.css jsonFormatter.min.js -.*avsc -.*txt -.*json -.*data -.*log +.*\.avsc +.*\.txt +.*\.json +.*\.data +.*\.log pyspark-coverage-site/* cloudpickle/* join.py @@ -98,17 +98,17 @@ local-1430917381535_2 DESCRIPTION NAMESPACE test_support/* -.*Rd +.*\.Rd help/* html/* INDEX .lintr gen-java.* -.*avpr -.*parquet +.*\.avpr +.*\.parquet spark-deps-.* -.*csv -.*tsv +.*\.csv +.*\.tsv .*\.sql .Rbuildignore META-INF/* @@ -125,3 +125,11 @@ application_1578436911597_0052 config.properties app-20200706201101-0003 py.typed +_metadata +_SUCCESS +part-00000 +.*\.res +flights_tiny.txt.1 +over1k +over10k +exported_table/* diff --git a/dev/deps/spark-deps-hadoop-2.7-hive-2.3 b/dev/deps/spark-deps-hadoop-2.7-hive-2.3 index 8c1ab9e3c1cfe..bcf05506855c5 100644 --- a/dev/deps/spark-deps-hadoop-2.7-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-2.7-hive-2.3 @@ -7,7 +7,7 @@ activation/1.1.1//activation-1.1.1.jar aircompressor/0.10//aircompressor-0.10.jar algebra_2.12/2.0.0-M2//algebra_2.12-2.0.0-M2.jar antlr-runtime/3.5.2//antlr-runtime-3.5.2.jar -antlr4-runtime/4.7.1//antlr4-runtime-4.7.1.jar +antlr4-runtime/4.8-1//antlr4-runtime-4.8-1.jar aopalliance-repackaged/2.6.1//aopalliance-repackaged-2.6.1.jar aopalliance/1.0//aopalliance-1.0.jar apacheds-i18n/2.0.0-M15//apacheds-i18n-2.0.0-M15.jar @@ -155,26 +155,26 @@ jsr305/3.0.0//jsr305-3.0.0.jar jta/1.1//jta-1.1.jar jul-to-slf4j/1.7.30//jul-to-slf4j-1.7.30.jar kryo-shaded/4.0.2//kryo-shaded-4.0.2.jar -kubernetes-client/4.11.1//kubernetes-client-4.11.1.jar -kubernetes-model-admissionregistration/4.11.1//kubernetes-model-admissionregistration-4.11.1.jar -kubernetes-model-apiextensions/4.11.1//kubernetes-model-apiextensions-4.11.1.jar -kubernetes-model-apps/4.11.1//kubernetes-model-apps-4.11.1.jar -kubernetes-model-autoscaling/4.11.1//kubernetes-model-autoscaling-4.11.1.jar -kubernetes-model-batch/4.11.1//kubernetes-model-batch-4.11.1.jar -kubernetes-model-certificates/4.11.1//kubernetes-model-certificates-4.11.1.jar -kubernetes-model-common/4.11.1//kubernetes-model-common-4.11.1.jar -kubernetes-model-coordination/4.11.1//kubernetes-model-coordination-4.11.1.jar -kubernetes-model-core/4.11.1//kubernetes-model-core-4.11.1.jar -kubernetes-model-discovery/4.11.1//kubernetes-model-discovery-4.11.1.jar -kubernetes-model-events/4.11.1//kubernetes-model-events-4.11.1.jar -kubernetes-model-extensions/4.11.1//kubernetes-model-extensions-4.11.1.jar -kubernetes-model-metrics/4.11.1//kubernetes-model-metrics-4.11.1.jar -kubernetes-model-networking/4.11.1//kubernetes-model-networking-4.11.1.jar -kubernetes-model-policy/4.11.1//kubernetes-model-policy-4.11.1.jar -kubernetes-model-rbac/4.11.1//kubernetes-model-rbac-4.11.1.jar -kubernetes-model-scheduling/4.11.1//kubernetes-model-scheduling-4.11.1.jar -kubernetes-model-settings/4.11.1//kubernetes-model-settings-4.11.1.jar -kubernetes-model-storageclass/4.11.1//kubernetes-model-storageclass-4.11.1.jar +kubernetes-client/4.12.0//kubernetes-client-4.12.0.jar +kubernetes-model-admissionregistration/4.12.0//kubernetes-model-admissionregistration-4.12.0.jar +kubernetes-model-apiextensions/4.12.0//kubernetes-model-apiextensions-4.12.0.jar +kubernetes-model-apps/4.12.0//kubernetes-model-apps-4.12.0.jar +kubernetes-model-autoscaling/4.12.0//kubernetes-model-autoscaling-4.12.0.jar +kubernetes-model-batch/4.12.0//kubernetes-model-batch-4.12.0.jar +kubernetes-model-certificates/4.12.0//kubernetes-model-certificates-4.12.0.jar +kubernetes-model-common/4.12.0//kubernetes-model-common-4.12.0.jar +kubernetes-model-coordination/4.12.0//kubernetes-model-coordination-4.12.0.jar +kubernetes-model-core/4.12.0//kubernetes-model-core-4.12.0.jar +kubernetes-model-discovery/4.12.0//kubernetes-model-discovery-4.12.0.jar +kubernetes-model-events/4.12.0//kubernetes-model-events-4.12.0.jar +kubernetes-model-extensions/4.12.0//kubernetes-model-extensions-4.12.0.jar +kubernetes-model-metrics/4.12.0//kubernetes-model-metrics-4.12.0.jar +kubernetes-model-networking/4.12.0//kubernetes-model-networking-4.12.0.jar +kubernetes-model-policy/4.12.0//kubernetes-model-policy-4.12.0.jar +kubernetes-model-rbac/4.12.0//kubernetes-model-rbac-4.12.0.jar +kubernetes-model-scheduling/4.12.0//kubernetes-model-scheduling-4.12.0.jar +kubernetes-model-settings/4.12.0//kubernetes-model-settings-4.12.0.jar +kubernetes-model-storageclass/4.12.0//kubernetes-model-storageclass-4.12.0.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar libfb303/0.9.3//libfb303-0.9.3.jar libthrift/0.12.0//libthrift-0.12.0.jar @@ -195,7 +195,6 @@ objenesis/2.6//objenesis-2.6.jar okhttp/3.12.12//okhttp-3.12.12.jar okio/1.14.0//okio-1.14.0.jar opencsv/2.3//opencsv-2.3.jar -openshift-model/4.11.1//openshift-model-4.11.1.jar orc-core/1.5.12//orc-core-1.5.12.jar orc-mapreduce/1.5.12//orc-mapreduce-1.5.12.jar orc-shims/1.5.12//orc-shims-1.5.12.jar diff --git a/dev/deps/spark-deps-hadoop-3.2-hive-2.3 b/dev/deps/spark-deps-hadoop-3.2-hive-2.3 index fcb993033221e..cd274bef7045b 100644 --- a/dev/deps/spark-deps-hadoop-3.2-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3.2-hive-2.3 @@ -7,7 +7,7 @@ activation/1.1.1//activation-1.1.1.jar aircompressor/0.10//aircompressor-0.10.jar algebra_2.12/2.0.0-M2//algebra_2.12-2.0.0-M2.jar antlr-runtime/3.5.2//antlr-runtime-3.5.2.jar -antlr4-runtime/4.7.1//antlr4-runtime-4.7.1.jar +antlr4-runtime/4.8-1//antlr4-runtime-4.8-1.jar aopalliance-repackaged/2.6.1//aopalliance-repackaged-2.6.1.jar arpack_combined_all/0.1//arpack_combined_all-0.1.jar arrow-format/2.0.0//arrow-format-2.0.0.jar @@ -125,26 +125,26 @@ jsr305/3.0.0//jsr305-3.0.0.jar jta/1.1//jta-1.1.jar jul-to-slf4j/1.7.30//jul-to-slf4j-1.7.30.jar kryo-shaded/4.0.2//kryo-shaded-4.0.2.jar -kubernetes-client/4.11.1//kubernetes-client-4.11.1.jar -kubernetes-model-admissionregistration/4.11.1//kubernetes-model-admissionregistration-4.11.1.jar -kubernetes-model-apiextensions/4.11.1//kubernetes-model-apiextensions-4.11.1.jar -kubernetes-model-apps/4.11.1//kubernetes-model-apps-4.11.1.jar -kubernetes-model-autoscaling/4.11.1//kubernetes-model-autoscaling-4.11.1.jar -kubernetes-model-batch/4.11.1//kubernetes-model-batch-4.11.1.jar -kubernetes-model-certificates/4.11.1//kubernetes-model-certificates-4.11.1.jar -kubernetes-model-common/4.11.1//kubernetes-model-common-4.11.1.jar -kubernetes-model-coordination/4.11.1//kubernetes-model-coordination-4.11.1.jar -kubernetes-model-core/4.11.1//kubernetes-model-core-4.11.1.jar -kubernetes-model-discovery/4.11.1//kubernetes-model-discovery-4.11.1.jar -kubernetes-model-events/4.11.1//kubernetes-model-events-4.11.1.jar -kubernetes-model-extensions/4.11.1//kubernetes-model-extensions-4.11.1.jar -kubernetes-model-metrics/4.11.1//kubernetes-model-metrics-4.11.1.jar -kubernetes-model-networking/4.11.1//kubernetes-model-networking-4.11.1.jar -kubernetes-model-policy/4.11.1//kubernetes-model-policy-4.11.1.jar -kubernetes-model-rbac/4.11.1//kubernetes-model-rbac-4.11.1.jar -kubernetes-model-scheduling/4.11.1//kubernetes-model-scheduling-4.11.1.jar -kubernetes-model-settings/4.11.1//kubernetes-model-settings-4.11.1.jar -kubernetes-model-storageclass/4.11.1//kubernetes-model-storageclass-4.11.1.jar +kubernetes-client/4.12.0//kubernetes-client-4.12.0.jar +kubernetes-model-admissionregistration/4.12.0//kubernetes-model-admissionregistration-4.12.0.jar +kubernetes-model-apiextensions/4.12.0//kubernetes-model-apiextensions-4.12.0.jar +kubernetes-model-apps/4.12.0//kubernetes-model-apps-4.12.0.jar +kubernetes-model-autoscaling/4.12.0//kubernetes-model-autoscaling-4.12.0.jar +kubernetes-model-batch/4.12.0//kubernetes-model-batch-4.12.0.jar +kubernetes-model-certificates/4.12.0//kubernetes-model-certificates-4.12.0.jar +kubernetes-model-common/4.12.0//kubernetes-model-common-4.12.0.jar +kubernetes-model-coordination/4.12.0//kubernetes-model-coordination-4.12.0.jar +kubernetes-model-core/4.12.0//kubernetes-model-core-4.12.0.jar +kubernetes-model-discovery/4.12.0//kubernetes-model-discovery-4.12.0.jar +kubernetes-model-events/4.12.0//kubernetes-model-events-4.12.0.jar +kubernetes-model-extensions/4.12.0//kubernetes-model-extensions-4.12.0.jar +kubernetes-model-metrics/4.12.0//kubernetes-model-metrics-4.12.0.jar +kubernetes-model-networking/4.12.0//kubernetes-model-networking-4.12.0.jar +kubernetes-model-policy/4.12.0//kubernetes-model-policy-4.12.0.jar +kubernetes-model-rbac/4.12.0//kubernetes-model-rbac-4.12.0.jar +kubernetes-model-scheduling/4.12.0//kubernetes-model-scheduling-4.12.0.jar +kubernetes-model-settings/4.12.0//kubernetes-model-settings-4.12.0.jar +kubernetes-model-storageclass/4.12.0//kubernetes-model-storageclass-4.12.0.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar libfb303/0.9.3//libfb303-0.9.3.jar libthrift/0.12.0//libthrift-0.12.0.jar @@ -165,7 +165,6 @@ objenesis/2.6//objenesis-2.6.jar okhttp/3.12.12//okhttp-3.12.12.jar okio/1.14.0//okio-1.14.0.jar opencsv/2.3//opencsv-2.3.jar -openshift-model/4.11.1//openshift-model-4.11.1.jar orc-core/1.5.12//orc-core-1.5.12.jar orc-mapreduce/1.5.12//orc-mapreduce-1.5.12.jar orc-shims/1.5.12//orc-shims-1.5.12.jar diff --git a/docs/_config.yml b/docs/_config.yml index 3be9807f81082..cd341063a1f92 100644 --- a/docs/_config.yml +++ b/docs/_config.yml @@ -26,3 +26,15 @@ SCALA_VERSION: "2.12.10" MESOS_VERSION: 1.0.0 SPARK_ISSUE_TRACKER_URL: https://issues.apache.org/jira/browse/SPARK SPARK_GITHUB_URL: https://github.com/apache/spark +# Before a new release, we should apply a new `apiKey` for the new Spark documentation +# on https://docsearch.algolia.com/. Otherwise, after release, the search results are always based +# on the latest documentation(https://spark.apache.org/docs/latest/) even when visiting the +# documentation of previous releases. +DOCSEARCH_SCRIPT: | + docsearch({ + apiKey: 'b18ca3732c502995563043aa17bc6ecb', + indexName: 'apache_spark', + inputSelector: '#docsearch-input', + enhancedSearchInput: true, + debug: false // Set debug to true if you want to inspect the dropdown + }); diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html index 65af17ed2e4a1..de98f29acf3b7 100755 --- a/docs/_layouts/global.html +++ b/docs/_layouts/global.html @@ -187,13 +187,7 @@
spark.mesos.dispatcher.queue
(none)
spark.mesos.dispatcher.queue.[QueueName]
.
+ If no queue is specified, then the application is submitted to the "default" queue with 0.0 priority.
+ spark.mesos.dispatcher.queue.[QueueName]
0.0
spark.mesos.dispatcher.queue
.
+ By default, the dispatcher has a single queue with 0.0 priority (cannot be overridden).
+ It is possible to implement a consistent and overall workload management policy throughout the lifecycle of drivers
+ by mapping priority queues to weighted Mesos roles, and by specifying a
+ spark.mesos.role
along with a spark.mesos.dispatcher.queue
when submitting an application.
+ For example, with the URGENT Mesos role:
+ + spark.mesos.dispatcher.queue.URGENT=1.0 + + spark.mesos.dispatcher.queue=URGENT + spark.mesos.role=URGENT ++
spark.mesos.gpus.max
0
- $$ - L &= 1/2n||\sum_i w_i(x_i - \bar{x_i}) / \hat{x_i} - (y - \bar{y}) / \hat{y}||^2 - + regTerms \\ - $$ -- */ - val arrayBuilder = mutable.ArrayBuilder.make[Double] - var state: optimizer.State = null - while (states.hasNext) { - state = states.next() - arrayBuilder += state.adjustedValue - } - - bcFeaturesMean.destroy() - bcFeaturesStd.destroy() - - (if (state == null) null else state.x.toArray, arrayBuilder.result) - } - - private def trainOnBlocks( + private def trainImpl( instances: RDD[Instance], + actualBlockSizeInMB: Double, yMean: Double, yStd: Double, featuresMean: Array[Double], @@ -606,9 +546,11 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String val func = StandardScalerModel.getTransformFunc(Array.empty, inverseStd, false, true) iter.map { case Instance(label, weight, vec) => Instance(label, weight, func(vec)) } } - val blocks = InstanceBlock.blokify(standardized, $(blockSize)) + + val maxMemUsage = (actualBlockSizeInMB * 1024L * 1024L).ceil.toLong + val blocks = InstanceBlock.blokifyWithMaxMemUsage(standardized, maxMemUsage) .persist(StorageLevel.MEMORY_AND_DISK) - .setName(s"training blocks (blockSize=${$(blockSize)})") + .setName(s"training blocks (blockSizeInMB=$actualBlockSizeInMB)") val costFun = $(loss) match { case SquaredError => diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala index 21eb17dfaacb3..75262ac4fe06b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala @@ -30,7 +30,6 @@ import org.apache.spark.mllib.regression._ import org.apache.spark.mllib.util.{DataValidators, Loader, Saveable} import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession -import org.apache.spark.storage.StorageLevel /** * Classification model trained using Multinomial/Binary Logistic Regression. @@ -339,10 +338,8 @@ class LogisticRegressionWithLBFGS // Convert our input into a DataFrame val spark = SparkSession.builder().sparkContext(input.context).getOrCreate() val df = spark.createDataFrame(input.map(_.asML)) - // Determine if we should cache the DF - val handlePersistence = input.getStorageLevel == StorageLevel.NONE // Train our model - val mlLogisticRegressionModel = lr.train(df, handlePersistence) + val mlLogisticRegressionModel = lr.train(df) // convert the model val weights = Vectors.dense(mlLogisticRegressionModel.coefficients.toArray) createModel(weights, mlLogisticRegressionModel.intercept) diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala index 51a6ae3c7e49b..d0b282db1ece8 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala @@ -593,8 +593,8 @@ class LogisticRegressionSuite extends MLTest with DefaultReadWriteTest { .setMaxIter(5) .setFamily("multinomial") val model = mlor.fit(dataset) - Seq(4, 16, 64).foreach { blockSize => - val model2 = mlor.setBlockSize(blockSize).fit(dataset) + Seq(0, 0.01, 0.1, 1, 2, 4).foreach { s => + val model2 = mlor.setMaxBlockSizeInMB(s).fit(dataset) assert(model.interceptVector ~== model2.interceptVector relTol 1e-6) assert(model.coefficientMatrix ~== model2.coefficientMatrix relTol 1e-6) } @@ -606,8 +606,8 @@ class LogisticRegressionSuite extends MLTest with DefaultReadWriteTest { .setMaxIter(5) .setFamily("binomial") val model = blor.fit(dataset) - Seq(4, 16, 64).foreach { blockSize => - val model2 = blor.setBlockSize(blockSize).fit(dataset) + Seq(0, 0.01, 0.1, 1, 2, 4).foreach { s => + val model2 = blor.setMaxBlockSizeInMB(s).fit(dataset) assert(model.intercept ~== model2.intercept relTol 1e-6) assert(model.coefficients ~== model2.coefficients relTol 1e-6) } diff --git a/mllib/src/test/scala/org/apache/spark/ml/fpm/PrefixSpanSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/fpm/PrefixSpanSuite.scala index 2252151af306b..cc8982f338702 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/fpm/PrefixSpanSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/fpm/PrefixSpanSuite.scala @@ -17,7 +17,6 @@ package org.apache.spark.ml.fpm import org.apache.spark.ml.util.MLTest -import org.apache.spark.sql.DataFrame class PrefixSpanSuite extends MLTest { diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala index 63ccfa3834624..e745e7f67df98 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala @@ -436,8 +436,8 @@ class AFTSurvivalRegressionSuite extends MLTest with DefaultReadWriteTest { .setQuantileProbabilities(quantileProbabilities) .setQuantilesCol("quantiles") val model = aft.fit(dataset) - Seq(4, 16, 64).foreach { blockSize => - val model2 = aft.setBlockSize(blockSize).fit(dataset) + Seq(0, 0.01, 0.1, 1, 2, 4).foreach { s => + val model2 = aft.setMaxBlockSizeInMB(s).fit(dataset) assert(model.coefficients ~== model2.coefficients relTol 1e-9) assert(model.intercept ~== model2.intercept relTol 1e-9) assert(model.scale ~== model2.scale relTol 1e-9) diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala index a30c47293c543..a0e17a4b40fd2 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala @@ -507,8 +507,6 @@ class GeneralizedLinearRegressionSuite extends MLTest with DefaultReadWriteTest val residualDeviancesR = Array(3.809296, 3.70055) - import GeneralizedLinearRegression._ - var idx = 0 val link = "log" val dataset = datasetPoissonLogWithZero @@ -790,8 +788,6 @@ class GeneralizedLinearRegressionSuite extends MLTest with DefaultReadWriteTest val expected = Seq(0.5108256, 0.1201443, 1.600000, 1.886792, 0.625, 0.530, -0.4700036, -0.6348783, 1.325782, 1.463641) - import GeneralizedLinearRegression._ - var idx = 0 for (family <- GeneralizedLinearRegression.supportedFamilyNames.sortWith(_ < _)) { for (useWeight <- Seq(false, true)) { diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala index fb70883bffc5f..b3098be0a36fb 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala @@ -672,8 +672,8 @@ class LinearRegressionSuite extends MLTest with DefaultReadWriteTest with PMMLRe .setLoss(loss) .setMaxIter(3) val model = lir.fit(dataset) - Seq(4, 16, 64).foreach { blockSize => - val model2 = lir.setBlockSize(blockSize).fit(dataset) + Seq(0, 0.01, 0.1, 1, 2, 4).foreach { s => + val model2 = lir.setMaxBlockSizeInMB(s).fit(dataset) assert(model.intercept ~== model2.intercept relTol 1e-9) assert(model.coefficients ~== model2.coefficients relTol 1e-9) assert(model.scale ~== model2.scale relTol 1e-9) diff --git a/mllib/src/test/scala/org/apache/spark/ml/stat/SummarizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/stat/SummarizerSuite.scala index 68ba57c0d5fc8..e438a4135908e 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/stat/SummarizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/stat/SummarizerSuite.scala @@ -29,7 +29,6 @@ class SummarizerSuite extends SparkFunSuite with MLlibTestSparkContext { import testImplicits._ import Summarizer._ - import SummaryBuilderImpl._ private case class ExpectedMetrics( mean: Vector, diff --git a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala index 2a83d0aaf9699..3ca6816ce7c0d 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.ml.tree.impl import scala.annotation.tailrec import scala.collection.mutable -import scala.language.implicitConversions import org.apache.spark.SparkFunSuite import org.apache.spark.ml.classification.DecisionTreeClassificationModel diff --git a/mllib/src/test/scala/org/apache/spark/ml/util/DefaultReadWriteTest.scala b/mllib/src/test/scala/org/apache/spark/ml/util/DefaultReadWriteTest.scala index dd0139b94f098..c5bf202a2d337 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/util/DefaultReadWriteTest.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/util/DefaultReadWriteTest.scala @@ -19,7 +19,6 @@ package org.apache.spark.ml.util import java.io.{File, IOException} -import org.json4s.JNothing import org.scalatest.Suite import org.apache.spark.{SparkException, SparkFunSuite} diff --git a/mllib/src/test/scala/org/apache/spark/ml/util/PMMLReadWriteTest.scala b/mllib/src/test/scala/org/apache/spark/ml/util/PMMLReadWriteTest.scala index d2c4832b12bac..19e9fe4bdb30e 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/util/PMMLReadWriteTest.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/util/PMMLReadWriteTest.scala @@ -23,10 +23,7 @@ import org.dmg.pmml.PMML import org.scalatest.Suite import org.apache.spark.SparkContext -import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.ml.param._ -import org.apache.spark.mllib.util.MLlibTestSparkContext -import org.apache.spark.sql.Dataset trait PMMLReadWriteTest extends TempDirectory { self: Suite => /** diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala index 56d41403f74cc..8f311bbf9f840 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala @@ -20,7 +20,6 @@ package org.apache.spark.mllib.clustering import java.util.{ArrayList => JArrayList} import breeze.linalg.{argmax, argtopk, max, DenseMatrix => BDM} -import org.scalatest.Assertions import org.apache.spark.SparkFunSuite import org.apache.spark.graphx.Edge diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala index 9d7177e0a149e..0e789821aa5f3 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala @@ -22,7 +22,7 @@ import java.{util => ju} import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV} import org.apache.spark.{SparkException, SparkFunSuite} -import org.apache.spark.mllib.linalg.{DenseMatrix, DenseVector, Matrices, Matrix, SparseMatrix, SparseVector, Vectors} +import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Matrix, SparseMatrix} import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.TestingUtils._ diff --git a/pom.xml b/pom.xml index 25c6da7100056..3ae2e7420e154 100644 --- a/pom.xml +++ b/pom.xml @@ -164,6 +164,7 @@
+ * A metadata column can expose additional metadata about a row. For example, rows from Kafka can + * use metadata columns to expose a message's topic, partition number, and offset. + *
+ * A metadata column could also be the result of a transform applied to a value in the row. For + * example, a partition value produced by bucket(id, 16) could be exposed by a metadata column. In + * this case, {@link #transform()} should return a non-null {@link Transform} that produced the + * metadata column's values. + */ +@Evolving +public interface MetadataColumn { + /** + * The name of this metadata column. + * + * @return a String name + */ + String name(); + + /** + * The data type of values in this metadata column. + * + * @return a {@link DataType} + */ + DataType dataType(); + + /** + * @return whether values produced by this metadata column may be null + */ + default boolean isNullable() { + return true; + } + + /** + * Documentation for this metadata column, or null. + * + * @return a documentation String + */ + default String comment() { + return null; + } + + /** + * The {@link Transform} used to produce this metadata column from data rows, or null. + * + * @return a {@link Transform} used to produce the column's values, or null if there isn't one + */ + default Transform transform() { + return null; + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsMetadataColumns.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsMetadataColumns.java new file mode 100644 index 0000000000000..208abfc302582 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsMetadataColumns.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +/** + * An interface for exposing data columns for a table that are not in the table schema. For example, + * a file source could expose a "file" column that contains the path of the file that contained each + * row. + *
+ * The columns returned by {@link #metadataColumns()} may be passed as {@link StructField} in + * requested projections. Sources that implement this interface and column projection using + * {@link SupportsPushDownRequiredColumns} must accept metadata fields passed to + * {@link SupportsPushDownRequiredColumns#pruneColumns(StructType)}. + *
+ * If a table column and a metadata column have the same name, the metadata column will never be + * requested. It is recommended that Table implementations reject data column name that conflict + * with metadata column names. + */ +@Evolving +public interface SupportsMetadataColumns extends Table { + /** + * Metadata columns that are supported by this {@link Table}. + *
+ * The columns returned by this method may be passed as {@link StructField} in requested + * projections using {@link SupportsPushDownRequiredColumns#pruneColumns(StructType)}. + *
+ * If a table column and a metadata column have the same name, the metadata column will never be
+ * requested and is ignored. It is recommended that Table implementations reject data column names
+ * that conflict with metadata column names.
+ *
+ * @return an array of {@link MetadataColumn}
+ */
+ MetadataColumn[] metadataColumns();
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SQLConfHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SQLConfHelper.scala
new file mode 100644
index 0000000000000..cee35cdb8d840
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SQLConfHelper.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst
+
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Trait for getting the active SQLConf.
+ */
+trait SQLConfHelper {
+
+ /**
+ * The active config object within the current scope.
+ * See [[SQLConf.get]] for more information.
+ */
+ def conf: SQLConf = SQLConf.get
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index c65e181181e83..53c7f17ee6b2e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.expressions.objects._
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
-import org.apache.spark.util.Utils
/**
@@ -894,10 +893,6 @@ trait ScalaReflection extends Logging {
import universe._
- // The Predef.Map is scala.collection.immutable.Map.
- // Since the map values can be mutable, we explicitly import scala.collection.Map at here.
- import scala.collection.Map
-
/**
* Any codes calling `scala.reflect.api.Types.TypeApi.<:<` should be wrapped by this method to
* clean up the Scala reflection garbage automatically. Otherwise, it will leak some objects to
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 690d66bec890d..8d95d8cf49d45 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -58,15 +58,14 @@ import org.apache.spark.util.Utils
*/
object SimpleAnalyzer extends Analyzer(
new CatalogManager(
- new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true),
FakeV2SessionCatalog,
new SessionCatalog(
new InMemoryCatalog,
- EmptyFunctionRegistry,
- new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true)) {
+ EmptyFunctionRegistry) {
override def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {}
- }),
- new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true))
+ })) {
+ override def resolver: Resolver = caseSensitiveResolution
+}
object FakeV2SessionCatalog extends TableCatalog {
private def fail() = throw new UnsupportedOperationException
@@ -130,10 +129,8 @@ object AnalysisContext {
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
* [[UnresolvedRelation]]s into fully typed objects using information in a [[SessionCatalog]].
*/
-class Analyzer(
- override val catalogManager: CatalogManager,
- conf: SQLConf)
- extends RuleExecutor[LogicalPlan] with CheckAnalysis with LookupCatalog {
+class Analyzer(override val catalogManager: CatalogManager)
+ extends RuleExecutor[LogicalPlan] with CheckAnalysis with LookupCatalog with SQLConfHelper {
private val v1SessionCatalog: SessionCatalog = catalogManager.v1SessionCatalog
@@ -144,10 +141,8 @@ class Analyzer(
override def isView(nameParts: Seq[String]): Boolean = v1SessionCatalog.isView(nameParts)
// Only for tests.
- def this(catalog: SessionCatalog, conf: SQLConf) = {
- this(
- new CatalogManager(conf, FakeV2SessionCatalog, catalog),
- conf)
+ def this(catalog: SessionCatalog) = {
+ this(new CatalogManager(FakeV2SessionCatalog, catalog))
}
def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = {
@@ -226,6 +221,7 @@ class Analyzer(
ResolveRelations ::
ResolveTables ::
ResolvePartitionSpec ::
+ AddMetadataColumns ::
ResolveReferences ::
ResolveCreateNamedStruct ::
ResolveDeserializer ::
@@ -921,6 +917,29 @@ class Analyzer(
}
}
+ /**
+ * Adds metadata columns to output for child relations when nodes are missing resolved attributes.
+ *
+ * References to metadata columns are resolved using columns from [[LogicalPlan.metadataOutput]],
+ * but the relation's output does not include the metadata columns until the relation is replaced
+ * using [[DataSourceV2Relation.withMetadataColumns()]]. Unless this rule adds metadata to the
+ * relation's output, the analyzer will detect that nothing produces the columns.
+ *
+ * This rule only adds metadata columns when a node is resolved but is missing input from its
+ * children. This ensures that metadata columns are not added to the plan unless they are used. By
+ * checking only resolved nodes, this ensures that * expansion is already done so that metadata
+ * columns are not accidentally selected by *.
+ */
+ object AddMetadataColumns extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
+ case node if node.resolved && node.children.nonEmpty && node.missingInput.nonEmpty =>
+ node resolveOperatorsUp {
+ case rel: DataSourceV2Relation =>
+ rel.withMetadataColumns()
+ }
+ }
+ }
+
/**
* Resolve table relations with concrete relations from v2 catalog.
*
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecision.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecision.scala
index 6eed152e6dd77..47a45b0e529c6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecision.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecision.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.Literal._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
index d3bb72badeb13..deeb8215d22c6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, SupportsNamespaces, TableCatalog, TableChange}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, TableCatalog, TableChange}
/**
* Resolves catalogs from the multi-part identifiers in SQL statements, and convert the statements
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index 814ea8c9768ae..7dcc6a81b48cd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -304,7 +304,7 @@ object UnsupportedOperationChecker extends Logging {
case LeftAnti =>
if (right.isStreaming) {
- throwError("Left anti joins with a streaming DataFrame/Dataset " +
+ throwError(s"$LeftAnti joins with a streaming DataFrame/Dataset " +
"on the right are not supported")
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala
index 51eb3d033ddc4..2fa6bf0acea67 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala
@@ -21,7 +21,6 @@ import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/timeZoneAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/timeZoneAnalysis.scala
index d8062744a4264..9234b58eb9f6e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/timeZoneAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/timeZoneAnalysis.scala
@@ -16,10 +16,10 @@
*/
package org.apache.spark.sql.catalyst.analysis
+import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ListQuery, TimeZoneAwareExpression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
/**
@@ -47,10 +47,7 @@ object ResolveTimeZone extends Rule[LogicalPlan] {
* Mix-in trait for constructing valid [[Cast]] expressions.
*/
trait CastSupport {
- /**
- * Configuration used to create a valid cast expression.
- */
- def conf: SQLConf
+ self: SQLConfHelper =>
/**
* Create a Cast expression with the session local time zone.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
index 83acfb8d4a71c..98bd84fb94bd6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
@@ -18,11 +18,10 @@
package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.catalog.CatalogFunction
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
-import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, SupportsNamespaces, Table, TableCatalog}
+import org.apache.spark.sql.catalyst.plans.logical.LeafNode
+import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, Table, TableCatalog}
/**
* Holds the name of a namespace that has yet to be looked up in a catalog. It will be resolved to
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
index 65601640fa044..06de023098a1c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View}
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.internal.SQLConf
/**
* This file defines view types and analysis rules related to views.
@@ -54,8 +53,6 @@ import org.apache.spark.sql.internal.SQLConf
* completely resolved during the batch of Resolution.
*/
object EliminateView extends Rule[LogicalPlan] with CastSupport {
- override def conf: SQLConf = SQLConf.get
-
override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
// The child has the different output attributes with the View operator. Adds a Project over
// the child of the view.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index db930cf7890e6..5643bf8b3a9b7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.catalog
-import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException, NoSuchPartitionException, NoSuchTableException}
+import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.types.StructType
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index c00d51dc3df1f..17ab6664df75c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -61,10 +61,11 @@ class SessionCatalog(
externalCatalogBuilder: () => ExternalCatalog,
globalTempViewManagerBuilder: () => GlobalTempViewManager,
functionRegistry: FunctionRegistry,
- conf: SQLConf,
hadoopConf: Configuration,
parser: ParserInterface,
- functionResourceLoader: FunctionResourceLoader) extends Logging {
+ functionResourceLoader: FunctionResourceLoader,
+ cacheSize: Int = SQLConf.get.tableRelationCacheSize,
+ cacheTTL: Long = SQLConf.get.metadataCacheTTL) extends SQLConfHelper with Logging {
import SessionCatalog._
import CatalogTypes.TablePartitionSpec
@@ -77,18 +78,21 @@ class SessionCatalog(
() => externalCatalog,
() => new GlobalTempViewManager(conf.getConf(GLOBAL_TEMP_DATABASE)),
functionRegistry,
- conf,
new Configuration(),
new CatalystSqlParser(),
- DummyFunctionResourceLoader)
+ DummyFunctionResourceLoader,
+ conf.tableRelationCacheSize,
+ conf.metadataCacheTTL)
+ }
+
+ // For testing only.
+ def this(externalCatalog: ExternalCatalog, functionRegistry: FunctionRegistry) = {
+ this(externalCatalog, functionRegistry, SQLConf.get)
}
// For testing only.
def this(externalCatalog: ExternalCatalog) = {
- this(
- externalCatalog,
- new SimpleFunctionRegistry,
- new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true))
+ this(externalCatalog, new SimpleFunctionRegistry)
}
lazy val externalCatalog = externalCatalogBuilder()
@@ -136,9 +140,6 @@ class SessionCatalog(
}
private val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
- val cacheSize = conf.tableRelationCacheSize
- val cacheTTL = conf.metadataCacheTTL
-
var builder = CacheBuilder.newBuilder()
.maximumSize(cacheSize)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 9c93691ca3b41..ee7216e93ebb5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -28,7 +28,7 @@ import org.apache.commons.lang3.StringUtils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, SQLConfHelper, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
@@ -177,8 +177,7 @@ case class CatalogTablePartition(
case class BucketSpec(
numBuckets: Int,
bucketColumnNames: Seq[String],
- sortColumnNames: Seq[String]) {
- def conf: SQLConf = SQLConf.get
+ sortColumnNames: Seq[String]) extends SQLConfHelper {
if (numBuckets <= 0 || numBuckets > conf.bucketingMaxBuckets) {
throw new AnalysisException(
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index b61c4b8d065f2..4cd649b07a5c0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.objects.Invoke
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
/**
* A collection of implicit conversions that create a DSL for constructing catalyst data structures.
@@ -102,6 +103,10 @@ package object dsl {
def like(other: Expression, escapeChar: Char = '\\'): Expression =
Like(expr, other, escapeChar)
def rlike(other: Expression): Expression = RLike(expr, other)
+ def likeAll(others: Expression*): Expression =
+ LikeAll(expr, others.map(_.eval(EmptyRow).asInstanceOf[UTF8String]))
+ def notLikeAll(others: Expression*): Expression =
+ NotLikeAll(expr, others.map(_.eval(EmptyRow).asInstanceOf[UTF8String]))
def contains(other: Expression): Expression = Contains(expr, other)
def startsWith(other: Expression): Expression = StartsWith(expr, other)
def endsWith(other: Expression): Expression = EndsWith(expr, other)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
index 3d5c1855f6975..9ab38044e6a88 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.catalyst.encoders
-import java.io.ObjectInputStream
-
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.{typeTag, TypeTag}
@@ -33,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.objects.{AssertNotNull, Initial
import org.apache.spark.sql.catalyst.optimizer.{ReassignLambdaVariableID, SimplifyCasts}
import org.apache.spark.sql.catalyst.plans.logical.{CatalystSerde, DeserializeToObject, LeafNode, LocalRelation}
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{DataType, ObjectType, StringType, StructField, StructType}
+import org.apache.spark.sql.types.{ObjectType, StringType, StructField, StructType}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index 4af12d61e86d9..5afc308e52ead 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.catalyst.expressions
-import java.math.{BigDecimal => JavaBigDecimal}
import java.time.ZoneId
import java.util.Locale
import java.util.concurrent.TimeUnit._
@@ -25,6 +24,7 @@ import java.util.concurrent.TimeUnit._
import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion}
+import org.apache.spark.sql.catalyst.expressions.Cast.{forceNullable, resolvableNullability}
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util._
@@ -258,13 +258,18 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit
def dataType: DataType
+ /**
+ * Returns true iff we can cast `from` type to `to` type.
+ */
+ def canCast(from: DataType, to: DataType): Boolean
+
override def toString: String = {
val ansi = if (ansiEnabled) "ansi_" else ""
s"${ansi}cast($child as ${dataType.simpleString})"
}
override def checkInputDataTypes(): TypeCheckResult = {
- if (Cast.canCast(child.dataType, dataType)) {
+ if (canCast(child.dataType, dataType)) {
TypeCheckResult.TypeCheckSuccess
} else {
TypeCheckResult.TypeCheckFailure(
@@ -1753,6 +1758,12 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String
copy(timeZoneId = Option(timeZoneId))
override protected val ansiEnabled: Boolean = SQLConf.get.ansiEnabled
+
+ override def canCast(from: DataType, to: DataType): Boolean = if (ansiEnabled) {
+ AnsiCast.canCast(from, to)
+ } else {
+ Cast.canCast(from, to)
+ }
}
/**
@@ -1770,6 +1781,110 @@ case class AnsiCast(child: Expression, dataType: DataType, timeZoneId: Option[St
copy(timeZoneId = Option(timeZoneId))
override protected val ansiEnabled: Boolean = true
+
+ override def canCast(from: DataType, to: DataType): Boolean = AnsiCast.canCast(from, to)
+}
+
+object AnsiCast {
+ /**
+ * As per section 6.13 "cast specification" in "Information technology — Database languages " +
+ * "- SQL — Part 2: Foundation (SQL/Foundation)":
+ * If the