Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32017][PYTHON][BUILD] Make Pyspark Hadoop 3.2+ Variant available in PyPI #29703

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dev/create-release/release-build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,9 @@ if [[ "$1" == "package" ]]; then
# In dry run mode, only build the first one. The keys in BINARY_PKGS_ARGS are used as the
# list of packages to be built, so it's ok for things to be missing in BINARY_PKGS_EXTRA.

# NOTE: Don't forget to update the valid combinations of distributions at
# 'python/pyspark.install.py' and 'python/docs/source/getting_started/installation.rst'
# if you're changing them.
declare -A BINARY_PKGS_ARGS
BINARY_PKGS_ARGS["hadoop3.2"]="-Phadoop-3.2 $HIVE_PROFILES"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we happen to drop Hive 1.2 (or add other combinations of profiles in the distributions), we'll have to change this and here. I believe this could be done separately later.

if ! is_dry_run; then
Expand Down
1 change: 1 addition & 0 deletions dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ def __hash__(self):
"pyspark.tests.test_conf",
"pyspark.tests.test_context",
"pyspark.tests.test_daemon",
"pyspark.tests.test_install_spark",
"pyspark.tests.test_join",
"pyspark.tests.test_profiler",
"pyspark.tests.test_rdd",
Expand Down
28 changes: 28 additions & 0 deletions python/docs/source/getting_started/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,34 @@ If you want to install extra dependencies for a specific componenet, you can ins

pip install pyspark[sql]

For PySpark with different Hadoop and/or Hive, you can install it by using ``HIVE_VERSION`` and ``HADOOP_VERSION`` environment variables as below:

.. code-block:: bash

HIVE_VERSION=2.3 pip install pyspark
HADOOP_VERSION=2.7 pip install pyspark
HIVE_VERSION=1.2 HADOOP_VERSION=2.7 pip install pyspark

The default distribution has built-in Hadoop 3.2 and Hive 2.3. If users specify different versions, the pip installation automatically
downloads a different version and use it in PySpark. Downloading it can take a while depending on the network and the mirror chosen.
It is recommended to use `-v` option in `pip` to track the installation and download status.

.. code-block:: bash

HADOOP_VERSION=2.7 pip install pyspark -v

Supported versions are as below:

====================================== ====================================== ======================================
``HADOOP_VERSION`` \\ ``HIVE_VERSION`` 1.2 2.3 (default)
====================================== ====================================== ======================================
**2.7** O O
**3.2 (default)** X O
**without** X O
====================================== ====================================== ======================================

Note that this installation of PySpark with different versions of Hadoop and Hive is experimental. It can change or be removed between minor releases.


Using Conda
-----------
Expand Down
4 changes: 4 additions & 0 deletions python/pyspark/find_spark_home.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ def is_spark_home(path):
import_error_raised = False
from importlib.util import find_spec
try:
# Spark distribution can be downloaded when HADOOP_VERSION environment variable is set.
# We should look up this directory first, see also SPARK-32017.
spark_dist_dir = "spark-distribution"
module_home = os.path.dirname(find_spec("pyspark").origin)
paths.append(os.path.join(module_home, spark_dist_dir))
HyukjinKwon marked this conversation as resolved.
Show resolved Hide resolved
paths.append(module_home)
# If we are installed in edit mode also look two dirs up
paths.append(os.path.join(module_home, "../../"))
Expand Down
170 changes: 170 additions & 0 deletions python/pyspark/install.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import re
import tarfile
import traceback
import urllib.request
from shutil import rmtree
# NOTE that we shouldn't import pyspark here because this is used in
# setup.py, and assume there's no PySpark imported.

DEFAULT_HADOOP = "hadoop3.2"
DEFAULT_HIVE = "hive2.3"
SUPPORTED_HADOOP_VERSIONS = ["hadoop2.7", "hadoop3.2", "without-hadoop"]
SUPPORTED_HIVE_VERSIONS = ["hive1.2", "hive2.3"]
UNSUPPORTED_COMBINATIONS = [
("without-hadoop", "hive1.2"),
("hadoop3.2", "hive1.2"),
]


def checked_package_name(spark_version, hadoop_version, hive_version):
if hive_version == "hive1.2":
return "%s-bin-%s-%s" % (spark_version, hadoop_version, hive_version)
else:
return "%s-bin-%s" % (spark_version, hadoop_version)


def checked_versions(spark_version, hadoop_version, hive_version):
"""
Check the valid combinations of supported versions in Spark distributions.

:param spark_version: Spark version. It should be X.X.X such as '3.0.0' or spark-3.0.0.
:param hadoop_version: Hadoop version. It should be X.X such as '2.7' or 'hadoop2.7'.
'without' and 'without-hadoop' are supported as special keywords for Hadoop free
distribution.
:param hive_version: Hive version. It should be X.X such as '1.2' or 'hive1.2'.

:return it returns fully-qualified versions of Spark, Hadoop and Hive in a tuple.
For example, spark-3.0.0, hadoop3.2 and hive2.3.
"""
if re.match("^[0-9]+\\.[0-9]+\\.[0-9]+$", spark_version):
spark_version = "spark-%s" % spark_version
if not spark_version.startswith("spark-"):
raise RuntimeError(
"Spark version should start with 'spark-' prefix; however, "
"got %s" % spark_version)

if hadoop_version == "without":
hadoop_version = "without-hadoop"
Comment on lines +63 to +64
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is "without-hadoop" also supported as special keyword? Seems not see it is matched here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is verified below if hadoop_version not in SUPPORTED_HADOOP_VERSIONS: later. There's a test case here: https://github.com/apache/spark/pull/29703/files/033a33ee515b95342e8c5a74e63054d915661579#diff-e23af4eb5cc3bf6af4bc26cb801b7e84R69 and https://github.com/apache/spark/pull/29703/files/033a33ee515b95342e8c5a74e63054d915661579#diff-e23af4eb5cc3bf6af4bc26cb801b7e84R88

Users can specify the Hadoop and Hive versions such as hadoop3.2 and hive2.3 as well but I didn't document this. These keywords are actually ported from SparkR as are SparkR::install.spark.

elif re.match("^[0-9]+\\.[0-9]+$", hadoop_version):
hadoop_version = "hadoop%s" % hadoop_version

if hadoop_version not in SUPPORTED_HADOOP_VERSIONS:
raise RuntimeError(
"Spark distribution of %s is not supported. Hadoop version should be "
"one of [%s]" % (hadoop_version, ", ".join(
SUPPORTED_HADOOP_VERSIONS)))

if re.match("^[0-9]+\\.[0-9]+$", hive_version):
hive_version = "hive%s" % hive_version

if hive_version not in SUPPORTED_HIVE_VERSIONS:
raise RuntimeError(
"Spark distribution of %s is not supported. Hive version should be "
"one of [%s]" % (hive_version, ", ".join(
SUPPORTED_HADOOP_VERSIONS)))

if (hadoop_version, hive_version) in UNSUPPORTED_COMBINATIONS:
raise RuntimeError("Hive 1.2 should only be with Hadoop 2.7.")

return spark_version, hadoop_version, hive_version


def install_spark(dest, spark_version, hadoop_version, hive_version):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I basically referred to

private def tryDownloadSpark(version: String, path: String): Unit = {
// Try a few mirrors first; fall back to Apache archive
val mirrors =
(0 until 2).flatMap { _ =>
try {
Some(getStringFromUrl("https://www.apache.org/dyn/closer.lua?preferred=true"))
} catch {
// If we can't get a mirror URL, skip it. No retry.
case _: Exception => None
}
}
val sites =
mirrors.distinct :+ "https://archive.apache.org/dist" :+ PROCESS_TABLES.releaseMirror
logInfo(s"Trying to download Spark $version from $sites")
for (site <- sites) {
val filename = s"spark-$version-bin-hadoop2.7.tgz"
val url = s"$site/spark/spark-$version/$filename"
logInfo(s"Downloading Spark $version from $url")
try {
getFileFromUrl(url, path, filename)
val downloaded = new File(sparkTestingDir, filename).getCanonicalPath
val targetDir = new File(sparkTestingDir, s"spark-$version").getCanonicalPath
Seq("mkdir", targetDir).!
val exitCode = Seq("tar", "-xzf", downloaded, "-C", targetDir, "--strip-components=1").!
Seq("rm", downloaded).!
// For a corrupted file, `tar` returns non-zero values. However, we also need to check
// the extracted file because `tar` returns 0 for empty file.
val sparkSubmit = new File(sparkTestingDir, s"spark-$version/bin/spark-submit")
if (exitCode == 0 && sparkSubmit.exists()) {
return
} else {
Seq("rm", "-rf", targetDir).!
}
} catch {
case ex: Exception =>
logWarning(s"Failed to download Spark $version from $url: ${ex.getMessage}")
}
}
fail(s"Unable to download Spark $version")
}

and

spark/R/pkg/R/install.R

Lines 68 to 161 in f53d8c6

install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL,
localDir = NULL, overwrite = FALSE) {
sparkHome <- Sys.getenv("SPARK_HOME")
if (isSparkRShell()) {
stopifnot(nchar(sparkHome) > 0)
message("Spark is already running in sparkR shell.")
return(invisible(sparkHome))
} else if (!is.na(file.info(sparkHome)$isdir)) {
message("Spark package found in SPARK_HOME: ", sparkHome)
return(invisible(sparkHome))
}
version <- paste0("spark-", packageVersion("SparkR"))
hadoopVersion <- tolower(hadoopVersion)
hadoopVersionName <- hadoopVersionName(hadoopVersion)
packageName <- paste(version, "bin", hadoopVersionName, sep = "-")
localDir <- ifelse(is.null(localDir), sparkCachePath(),
normalizePath(localDir, mustWork = FALSE))
if (is.na(file.info(localDir)$isdir)) {
dir.create(localDir, recursive = TRUE)
}
if (overwrite) {
message("Overwrite = TRUE: download and overwrite the tar file",
"and Spark package directory if they exist.")
}
releaseUrl <- Sys.getenv("SPARKR_RELEASE_DOWNLOAD_URL")
if (releaseUrl != "") {
packageName <- basenameSansExtFromUrl(releaseUrl)
}
packageLocalDir <- file.path(localDir, packageName)
# can use dir.exists(packageLocalDir) under R 3.2.0 or later
if (!is.na(file.info(packageLocalDir)$isdir) && !overwrite) {
if (releaseUrl != "") {
message(packageName, " found, setting SPARK_HOME to ", packageLocalDir)
} else {
message(version, " for Hadoop ",
if (hadoopVersion == "without") "Free build" else hadoopVersion,
" found, setting SPARK_HOME to ", packageLocalDir)
}
Sys.setenv(SPARK_HOME = packageLocalDir)
return(invisible(packageLocalDir))
} else {
message("Spark not found in the cache directory. Installation will start.")
}
packageLocalPath <- paste0(packageLocalDir, ".tgz")
tarExists <- file.exists(packageLocalPath)
if (tarExists && !overwrite) {
message("tar file found.")
} else {
if (releaseUrl != "") {
message("Downloading from alternate URL:\n- ", releaseUrl)
success <- downloadUrl(releaseUrl, packageLocalPath)
if (!success) {
unlink(packageLocalPath)
stop("Fetch failed from ", releaseUrl)
}
} else {
robustDownloadTar(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath)
}
}
message("Installing to ", localDir)
# There are two ways untar can fail - untar could stop() on errors like incomplete block on file
# or, tar command can return failure code
success <- tryCatch(untar(tarfile = packageLocalPath, exdir = localDir) == 0,
error = function(e) {
message(e, "\n")
FALSE
},
warning = function(w) {
message(w, "\n")
FALSE
})
if (!tarExists || overwrite || !success) {
unlink(packageLocalPath)
if (success) {
# if tar file was not there before (or it was, but we are told to overwrite it),
# and untar is successful - set a flag that we have downloaded (and untar) Spark package.
assign(".sparkDownloaded", TRUE, envir = .sparkREnv)
}
}
if (!success) stop("Extract archive failed.")
message("DONE.")
Sys.setenv(SPARK_HOME = packageLocalDir)
message("SPARK_HOME set to ", packageLocalDir)
invisible(packageLocalDir)
}

"""
Installs Spark that corresponds to the given Hadoop version in the current
library directory.

:param dest: The location to download and install the Spark.
:param spark_version: Spark version. It should be spark-X.X.X form.
:param hadoop_version: Hadoop version. It should be hadoopX.X
such as 'hadoop2.7' or 'without-hadoop'.
:param hive_version: Hive version. It should be hiveX.X such as 'hive1.2'.
"""

package_name = checked_package_name(spark_version, hadoop_version, hive_version)
package_local_path = os.path.join(dest, "%s.tgz" % package_name)
sites = get_preferred_mirrors()
print("Trying to download Spark %s from [%s]" % (spark_version, ", ".join(sites)))

pretty_pkg_name = "%s for Hadoop %s" % (
spark_version,
"Free build" if hadoop_version == "without" else hadoop_version)

for site in sites:
os.makedirs(dest, exist_ok=True)
url = "%s/spark/%s/%s.tgz" % (site, spark_version, package_name)

tar = None
try:
print("Downloading %s from:\n- %s" % (pretty_pkg_name, url))
download_to_file(urllib.request.urlopen(url), package_local_path)

print("Installing to %s" % dest)
tar = tarfile.open(package_local_path, "r:gz")
for member in tar.getmembers():
if member.name == package_name:
# Skip the root directory.
continue
member.name = os.path.relpath(member.name, package_name + os.path.sep)
tar.extract(member, dest)
return
except Exception:
print("Failed to download %s from %s:" % (pretty_pkg_name, url))
traceback.print_exc()
rmtree(dest, ignore_errors=True)
finally:
if tar is not None:
tar.close()
if os.path.exists(package_local_path):
os.remove(package_local_path)
raise IOError("Unable to download %s." % pretty_pkg_name)


def get_preferred_mirrors():
mirror_urls = []
for _ in range(3):
try:
response = urllib.request.urlopen(
"https://www.apache.org/dyn/closer.lua?preferred=true")
mirror_urls.append(response.read().decode('utf-8'))
except Exception:
# If we can't get a mirror URL, skip it. No retry.
pass

default_sites = [
"https://archive.apache.org/dist", "https://dist.apache.org/repos/dist/release"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All non-current versions of Spark will hit the archive, since the mirrors only maintain the latest version. I don't think the archive will be able to handle the volume of traffic that will eventually come its way from various people downloading (and re-downloading) Spark, e.g. as part of CI setup.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When they install, I think it will likely be the latest in most of cases. I guess that's the reason why we moved the old versions into these archive and keep the the latest versions in the mirrors.

People are already using this to download old versions or to setup the CI. This PR just makes it easier to do it.

return list(set(mirror_urls)) + default_sites


def download_to_file(response, path, chunk_size=1024 * 1024):
total_size = int(response.info().get('Content-Length').strip())
bytes_so_far = 0

with open(path, mode="wb") as dest:
while True:
chunk = response.read(chunk_size)
bytes_so_far += len(chunk)
if not chunk:
break
dest.write(chunk)
print("Downloaded %d of %d bytes (%0.2f%%)" % (
bytes_so_far,
total_size,
round(float(bytes_so_far) / total_size * 100, 2)))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of showing the progress are two:

  • The print out will be seen when pip install ... -v.

  • The output from plan pip without -v shows like it's in progress (otherwise it looks like it hangs). For example:

      Building wheel for pyspark (setup.py) ... -
      Building wheel for pyspark (setup.py) ... \
      Building wheel for pyspark (setup.py) ... |
    

112 changes: 112 additions & 0 deletions python/pyspark/tests/test_install_spark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import tempfile
import unittest

from pyspark.install import install_spark, DEFAULT_HADOOP, DEFAULT_HIVE, \
UNSUPPORTED_COMBINATIONS, checked_versions, checked_package_name


class SparkInstallationTestCase(unittest.TestCase):

def test_install_spark(self):
# Test only one case. Testing this is expensive because it needs to download
# the Spark distribution.
spark_version, hadoop_version, hive_version = checked_versions("3.0.1", "3.2", "2.3")

with tempfile.TemporaryDirectory() as tmp_dir:
install_spark(
dest=tmp_dir,
spark_version=spark_version,
hadoop_version=hadoop_version,
hive_version=hive_version)

self.assertTrue(os.path.isdir("%s/jars" % tmp_dir))
self.assertTrue(os.path.exists("%s/bin/spark-submit" % tmp_dir))
self.assertTrue(os.path.exists("%s/RELEASE" % tmp_dir))

def test_package_name(self):
self.assertEqual(
"spark-3.0.0-bin-hadoop3.2-hive1.2",
checked_package_name("spark-3.0.0", "hadoop3.2", "hive1.2"))
self.assertEqual(
"spark-3.0.0-bin-hadoop3.2",
checked_package_name("spark-3.0.0", "hadoop3.2", "hive2.3"))

def test_checked_versions(self):
test_version = "3.0.1" # Just pick one version to test.

# Positive test cases
self.assertEqual(
("spark-3.0.0", "hadoop2.7", "hive1.2"),
checked_versions("spark-3.0.0", "hadoop2.7", "hive1.2"))

self.assertEqual(
("spark-3.0.0", "hadoop2.7", "hive1.2"),
checked_versions("3.0.0", "2.7", "1.2"))

self.assertEqual(
("spark-2.4.1", "without-hadoop", "hive2.3"),
checked_versions("2.4.1", "without", "2.3"))

self.assertEqual(
("spark-3.0.1", "without-hadoop", "hive2.3"),
checked_versions("spark-3.0.1", "without-hadoop", "hive2.3"))

# Negative test cases
for (hadoop_version, hive_version) in UNSUPPORTED_COMBINATIONS:
with self.assertRaisesRegex(RuntimeError, 'Hive.*should.*Hadoop'):
checked_versions(
spark_version=test_version,
hadoop_version=hadoop_version,
hive_version=hive_version)

with self.assertRaisesRegex(RuntimeError, "Spark version should start with 'spark-'"):
checked_versions(
spark_version="malformed",
hadoop_version=DEFAULT_HADOOP,
hive_version=DEFAULT_HIVE)

with self.assertRaisesRegex(RuntimeError, "Spark distribution.*malformed.*"):
checked_versions(
spark_version=test_version,
hadoop_version="malformed",
hive_version=DEFAULT_HIVE)

with self.assertRaisesRegex(RuntimeError, "Spark distribution.*malformed.*"):
checked_versions(
spark_version=test_version,
hadoop_version=DEFAULT_HADOOP,
hive_version="malformed")

with self.assertRaisesRegex(RuntimeError, "Hive 1.2 should only be with Hadoop 2.7"):
checked_versions(
spark_version=test_version,
hadoop_version="hadoop3.2",
hive_version="hive1.2")


if __name__ == "__main__":
from pyspark.tests.test_install_spark import * # noqa: F401

try:
import xmlrunner
testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)
Loading