-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32017][PYTHON][BUILD] Make Pyspark Hadoop 3.2+ Variant available in PyPI #29703
Conversation
return spark_version, hadoop_version, hive_version | ||
|
||
|
||
def install_spark(dest, spark_version, hadoop_version, hive_version): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I basically referred to
spark/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
Lines 70 to 111 in b84ed41
private def tryDownloadSpark(version: String, path: String): Unit = { | |
// Try a few mirrors first; fall back to Apache archive | |
val mirrors = | |
(0 until 2).flatMap { _ => | |
try { | |
Some(getStringFromUrl("https://www.apache.org/dyn/closer.lua?preferred=true")) | |
} catch { | |
// If we can't get a mirror URL, skip it. No retry. | |
case _: Exception => None | |
} | |
} | |
val sites = | |
mirrors.distinct :+ "https://archive.apache.org/dist" :+ PROCESS_TABLES.releaseMirror | |
logInfo(s"Trying to download Spark $version from $sites") | |
for (site <- sites) { | |
val filename = s"spark-$version-bin-hadoop2.7.tgz" | |
val url = s"$site/spark/spark-$version/$filename" | |
logInfo(s"Downloading Spark $version from $url") | |
try { | |
getFileFromUrl(url, path, filename) | |
val downloaded = new File(sparkTestingDir, filename).getCanonicalPath | |
val targetDir = new File(sparkTestingDir, s"spark-$version").getCanonicalPath | |
Seq("mkdir", targetDir).! | |
val exitCode = Seq("tar", "-xzf", downloaded, "-C", targetDir, "--strip-components=1").! | |
Seq("rm", downloaded).! | |
// For a corrupted file, `tar` returns non-zero values. However, we also need to check | |
// the extracted file because `tar` returns 0 for empty file. | |
val sparkSubmit = new File(sparkTestingDir, s"spark-$version/bin/spark-submit") | |
if (exitCode == 0 && sparkSubmit.exists()) { | |
return | |
} else { | |
Seq("rm", "-rf", targetDir).! | |
} | |
} catch { | |
case ex: Exception => | |
logWarning(s"Failed to download Spark $version from $url: ${ex.getMessage}") | |
} | |
} | |
fail(s"Unable to download Spark $version") | |
} |
and
Lines 68 to 161 in f53d8c6
install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL, | |
localDir = NULL, overwrite = FALSE) { | |
sparkHome <- Sys.getenv("SPARK_HOME") | |
if (isSparkRShell()) { | |
stopifnot(nchar(sparkHome) > 0) | |
message("Spark is already running in sparkR shell.") | |
return(invisible(sparkHome)) | |
} else if (!is.na(file.info(sparkHome)$isdir)) { | |
message("Spark package found in SPARK_HOME: ", sparkHome) | |
return(invisible(sparkHome)) | |
} | |
version <- paste0("spark-", packageVersion("SparkR")) | |
hadoopVersion <- tolower(hadoopVersion) | |
hadoopVersionName <- hadoopVersionName(hadoopVersion) | |
packageName <- paste(version, "bin", hadoopVersionName, sep = "-") | |
localDir <- ifelse(is.null(localDir), sparkCachePath(), | |
normalizePath(localDir, mustWork = FALSE)) | |
if (is.na(file.info(localDir)$isdir)) { | |
dir.create(localDir, recursive = TRUE) | |
} | |
if (overwrite) { | |
message("Overwrite = TRUE: download and overwrite the tar file", | |
"and Spark package directory if they exist.") | |
} | |
releaseUrl <- Sys.getenv("SPARKR_RELEASE_DOWNLOAD_URL") | |
if (releaseUrl != "") { | |
packageName <- basenameSansExtFromUrl(releaseUrl) | |
} | |
packageLocalDir <- file.path(localDir, packageName) | |
# can use dir.exists(packageLocalDir) under R 3.2.0 or later | |
if (!is.na(file.info(packageLocalDir)$isdir) && !overwrite) { | |
if (releaseUrl != "") { | |
message(packageName, " found, setting SPARK_HOME to ", packageLocalDir) | |
} else { | |
message(version, " for Hadoop ", | |
if (hadoopVersion == "without") "Free build" else hadoopVersion, | |
" found, setting SPARK_HOME to ", packageLocalDir) | |
} | |
Sys.setenv(SPARK_HOME = packageLocalDir) | |
return(invisible(packageLocalDir)) | |
} else { | |
message("Spark not found in the cache directory. Installation will start.") | |
} | |
packageLocalPath <- paste0(packageLocalDir, ".tgz") | |
tarExists <- file.exists(packageLocalPath) | |
if (tarExists && !overwrite) { | |
message("tar file found.") | |
} else { | |
if (releaseUrl != "") { | |
message("Downloading from alternate URL:\n- ", releaseUrl) | |
success <- downloadUrl(releaseUrl, packageLocalPath) | |
if (!success) { | |
unlink(packageLocalPath) | |
stop("Fetch failed from ", releaseUrl) | |
} | |
} else { | |
robustDownloadTar(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) | |
} | |
} | |
message("Installing to ", localDir) | |
# There are two ways untar can fail - untar could stop() on errors like incomplete block on file | |
# or, tar command can return failure code | |
success <- tryCatch(untar(tarfile = packageLocalPath, exdir = localDir) == 0, | |
error = function(e) { | |
message(e, "\n") | |
FALSE | |
}, | |
warning = function(w) { | |
message(w, "\n") | |
FALSE | |
}) | |
if (!tarExists || overwrite || !success) { | |
unlink(packageLocalPath) | |
if (success) { | |
# if tar file was not there before (or it was, but we are told to overwrite it), | |
# and untar is successful - set a flag that we have downloaded (and untar) Spark package. | |
assign(".sparkDownloaded", TRUE, envir = .sparkREnv) | |
} | |
} | |
if (!success) stop("Extract archive failed.") | |
message("DONE.") | |
Sys.setenv(SPARK_HOME = packageLocalDir) | |
message("SPARK_HOME set to ", packageLocalDir) | |
invisible(packageLocalDir) | |
} |
cc @srowen, @dongjoon-hyun, @holdenk, @BryanCutler, @viirya, @ueshin FYI |
Thank you for pinging me, @HyukjinKwon . cc @gatorsmile . Do you have any opinion on Hive 1.2 at Apache Spark 3.1.0? |
print("Downloaded %d of %d bytes (%0.2f%%)" % ( | ||
bytes_so_far, | ||
total_size, | ||
round(float(bytes_so_far) / total_size * 100, 2))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The purpose of showing the progress are two:
-
The print out will be seen when
pip install ... -v
. -
The output from plan
pip
without-v
shows like it's in progress (otherwise it looks like it hangs). For example:Building wheel for pyspark (setup.py) ... - Building wheel for pyspark (setup.py) ... \ Building wheel for pyspark (setup.py) ... |
@@ -275,6 +275,8 @@ if [[ "$1" == "package" ]]; then | |||
# In dry run mode, only build the first one. The keys in BINARY_PKGS_ARGS are used as the | |||
# list of packages to be built, so it's ok for things to be missing in BINARY_PKGS_EXTRA. | |||
|
|||
# NOTE: Don't forget to update the valid combinations of distributions at | |||
# 'python/pyspark.install.py' if you're changing them. | |||
declare -A BINARY_PKGS_ARGS | |||
BINARY_PKGS_ARGS["hadoop3.2"]="-Phadoop-3.2 $HIVE_PROFILES" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we happen to drop Hive 1.2 (or add other combinations of profiles in the distributions), we'll have to change this and here. I believe this could be done separately later.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
### What changes were proposed in this pull request? PyPI and CRAN did not change because of the concern about selecting Hadoop and Hive versions. For PyPI, now there is a PR open at #29703 For CRAN, we can already select Hadoop and Hive versions via `SparkR::install.spark`. ### Why are the changes needed? To keep the default profiles consistent in distributions ### Does this PR introduce _any_ user-facing change? Yes, the default distributions will use Hadoop 3.2. ### How was this patch tested? Jenkins tests. Closes #29704 from HyukjinKwon/SPARK-32058. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This comment has been minimized.
This comment has been minimized.
I think exposing the option is fine. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some high-level questions:
-
How does this interact with the ability to specify dependencies in a requirements file? It seems odd to have to do something like
HADOOP_VERSION=3.2 pip install -r requirements.txt
, because, kinda like with--install-option
, we've now modified pip's behavior across all the libraries it's going to install.I also wonder if this plays well with tools like pip-tools that compile down requirements files into the full list of their transitive dependencies. I'm guessing users will need to manually preserve the environment variables, because they will not be reflected in the compiled requirements.
-
Have you considered publishing these alternate builds under different package names? e.g.
pyspark-hadoop3.2
. This avoids the need to mess with environment variables, and delivers a more vanilla install experience. But it will also push us to define upfront what combinations to publish builds for to PyPI. -
Are you sure it's OK to point at
archive.apache.org
? Everyone installing a non-current version of PySpark with alternate versions of Hadoop / Hive specified will hit the archive. Unlike PyPI, the Apache archive is not backed by a generous CDN:Do note that a daily limit of 5GB per IP is being enforced on archive.apache.org, to prevent abuse.
In Flintrock, I never touch the archive out of fear of being an "abusive user". This is another argument for publishing alternate packages to PyPI.
pass | ||
|
||
default_sites = [ | ||
"https://archive.apache.org/dist", "https://dist.apache.org/repos/dist/release"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All non-current versions of Spark will hit the archive, since the mirrors only maintain the latest version. I don't think the archive will be able to handle the volume of traffic that will eventually come its way from various people downloading (and re-downloading) Spark, e.g. as part of CI setup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When they install, I think it will likely be the latest in most of cases. I guess that's the reason why we moved the old versions into these archive and keep the the latest versions in the mirrors.
People are already using this to download old versions or to setup the CI. This PR just makes it easier to do it.
I agree that it doesn't look very pip friendly. That's why I had to investigate a lot and write down what I checked in the PR description.
We can just keep this as an experimental mode for the time being in this way, and switch it to the proper pip installation option once they support in the future.
I have thought about this option too but ..
Yeah, I understand this can be a valid concern. But this is already available to use and people use it. Also it's being used in our own CI: spark/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala Line 82 in b84ed41
The PR makes it easier to use them to download old versions. We can make it configurable as well via exposing an environment variable. |
75dbcaf
to
83815e0
Compare
Test build #128639 has finished for PR 29703 at commit
|
I documented it. I believe this is ready for a review. |
I proofread, tested again and fixed some docs. |
if hadoop_version == "without": | ||
hadoop_version = "without-hadoop" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is "without-hadoop" also supported as special keyword? Seems not see it is matched here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is verified below if hadoop_version not in SUPPORTED_HADOOP_VERSIONS:
later. There's a test case here: https://github.com/apache/spark/pull/29703/files/033a33ee515b95342e8c5a74e63054d915661579#diff-e23af4eb5cc3bf6af4bc26cb801b7e84R69 and https://github.com/apache/spark/pull/29703/files/033a33ee515b95342e8c5a74e63054d915661579#diff-e23af4eb5cc3bf6af4bc26cb801b7e84R88
Users can specify the Hadoop and Hive versions such as hadoop3.2
and hive2.3
as well but I didn't document this. These keywords are actually ported from SparkR as are SparkR::install.spark
.
Test build #128789 has finished for PR 29703 at commit
|
Test build #128793 has finished for PR 29703 at commit
|
09997b7
to
058e61a
Compare
Test build #128903 has finished for PR 29703 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM otherwise.
I tested again on both Windows and Mac. |
Test build #128956 has finished for PR 29703 at commit
|
Test build #128961 has finished for PR 29703 at commit
|
Test build #128960 has finished for PR 29703 at commit
|
retest this please |
Test build #128973 has finished for PR 29703 at commit
|
Merged to master. |
Thanks @viirya @ueshin @nchammas, @srowen and @dongjoon-hyun for reviewing this. |
…OOP_VERSION in pip installation option ### What changes were proposed in this pull request? This PR is a followup of #29703. It renames `HADOOP_VERSION` environment variable to `PYSPARK_HADOOP_VERSION` in case `HADOOP_VERSION` is already being used somewhere. Arguably `HADOOP_VERSION` is a pretty common name. I see here and there: - https://www.ibm.com/support/knowledgecenter/SSZUMP_7.2.1/install_grid_sym/understanding_advanced_edition.html - https://cwiki.apache.org/confluence/display/ARROW/HDFS+Filesystem+Support - http://crs4.github.io/pydoop/_pydoop1/installation.html ### Why are the changes needed? To avoid the environment variables is unexpectedly conflicted. ### Does this PR introduce _any_ user-facing change? It renames the environment variable but it's not released yet. ### How was this patch tested? Existing unittests will test. Closes #31028 from HyukjinKwon/SPARK-32017-followup. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…OOP_VERSION in pip installation option ### What changes were proposed in this pull request? This PR is a followup of #29703. It renames `HADOOP_VERSION` environment variable to `PYSPARK_HADOOP_VERSION` in case `HADOOP_VERSION` is already being used somewhere. Arguably `HADOOP_VERSION` is a pretty common name. I see here and there: - https://www.ibm.com/support/knowledgecenter/SSZUMP_7.2.1/install_grid_sym/understanding_advanced_edition.html - https://cwiki.apache.org/confluence/display/ARROW/HDFS+Filesystem+Support - http://crs4.github.io/pydoop/_pydoop1/installation.html ### Why are the changes needed? To avoid the environment variables is unexpectedly conflicted. ### Does this PR introduce _any_ user-facing change? It renames the environment variable but it's not released yet. ### How was this patch tested? Existing unittests will test. Closes #31028 from HyukjinKwon/SPARK-32017-followup. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org> (cherry picked from commit 329850c) Signed-off-by: HyukjinKwon <gurwls223@apache.org>
What changes were proposed in this pull request?
This PR proposes to add a way to select Hadoop and Hive versions in pip installation.
Users can select Hive or Hadoop versions as below:
When the environment variables are set, internally it downloads the corresponding Spark version and then sets the Spark home to it. Also this PR exposes a mirror to set as an environment variable,
PYSPARK_RELEASE_MIRROR
.Please NOTE that:
We cannot currently leverage pip's native installation option, for example:
pip install pyspark --install-option="hadoop3.2"
This is because of a limitation and bug in pip itself. Once they fix this issue, we can switch from the environment variables to the proper installation options, see SPARK-32837.
It IS possible to workaround but very ugly or hacky with a big change. See this PR as an example.
In pip installation, we pack the relevant jars together. This PR does not touch existing packaging way in order to prevent any behaviour changes.
Once this experimental way is proven to be safe, we can avoid packing the relevant jars together (and keep only the relevant Python scripts). And downloads the Spark distribution as this PR proposes.
This way is sort of consistent with SparkR:
SparkR provides a method
SparkR::install.spark
to support CRAN installation. This is fine because SparkR is provided purely as a R library. For example,sparkr
script is not packed together.PySpark cannot take this approach because PySpark packaging ships relevant executable script together, e.g.)
pyspark
shell.If PySpark has a method such as
pyspark.install_spark
, users cannot call it inpyspark
becausepyspark
already assumes relevant Spark is installed, JVM is launched, etc.There looks no way to release that contains different Hadoop or Hive to PyPI due to the version semantics. This is not an option.
The usual way looks either
--install-option
above with hacks or environment variables given my investigation.Why are the changes needed?
To provide users the options to select Hadoop and Hive versions.
Does this PR introduce any user-facing change?
Yes, users will be able to select Hive and Hadoop version as below when they install it from
pip
;How was this patch tested?
Unit tests were added. I also manually tested in Mac and Windows (after building Spark with
python/dist/pyspark-3.1.0.dev0.tar.gz
):Mac:
Windows: