-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-12868][SQL] Allow adding jars from hdfs #17342
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #74790 has finished for PR 17342 at commit
|
|
Jenkins, test this please |
Test build #74792 has finished for PR 17342 at commit
|
Hi, @rxin Could you please review this PR? Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM, just some small comments.
private var hdfsHandler : URLStreamHandler = _ | ||
|
||
def createURLStreamHandler(protocol: String): URLStreamHandler = { | ||
if (protocol.compareToIgnoreCase("hdfs") == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here looks like you only support HDFS, actually FsUrlStreamingHandlerFactory
could support different Hadoop compatible system system, should we also support others, like wasb
, webhdfs
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's a good point. I'll check with Hadoop for all supported file systems, and ideally if we can get them via some API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
API? no, just fs.*.impl for the standard ones, discovery via META-INF/services and you don't want to go there. Probably better to have a core list of the hadoop redists (including the new 2.8+ adl & oss object stores), and the google cloud URL (gss ? )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure which file systems FsUrlStreamHandlerFactory
supports. Maybe for now just put "hdfs" in and we can add more when user actually needs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you have to do compare here, Hadoop itself will find out supported FS through fs.*.impl
and service loader.
Be noted HDI will use wasb by default, so your assumption here ("hdfs") may potentially break their codes.
This comes to the question below, why we need to wrap the FsUrlStreamHandlerFactory
here? The only difference is that you add one more check to see if it is hdfs or not. I think it is not necessary and is handled by hadoop already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, we've been backing out of using service discovery for the filesystem clients built into hadoop (e.g. HADOOP-14138). Why? hurting startup times, especially once we'd switched to the fully-shaded-own-jackson version of the AWS SDK. From Hadoop 2.8+, at least for now, you get the list of internal ones on a scan of config options. But we reserve the right to change it in future.
I'd be amenable to having an API call in which FileSystem lists all URL schemas which the JVM knows about. That doesn't mean that they will load, only that it knows the implementation classname.
I've also considered having a config option which lists all schemas it knows are object stores, a simple comma separated list, where we could include things like google gss:,, even though it's not bundled. Why? Lets apps downstream such as spark, hive and flink see if a filesystem is an object store, without having to add a whole new API. And if its in the list, expect different behaviours, like expensive renames. That, being just an overridable config option, is inexpensive to add
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @steveloughran. Shall I hold up this PR and wait for the API or config option to be ready? Are they on your schedule? Or shall I just finish this PR first and then make some change when the new API is ready?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO, I think we should not rely on Hadoop 2.8+ feature, Spark's supported version is 2.6, it would be better to have a general solution (avoid depending on specific version of Hadoop).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jerryshao I was thinking about using reflection to check whether the API exists and if it exists then we have a whole solution. Maybe it's not worth. I'll just support hdfs for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, missed this. There's nothing explicit in 2.8+ right now; don't hold your breath. If people do want to co-dev one, be happy to help. There's no point me implementing something which isn't useful/going to be used by downstream projects.
@@ -148,6 +149,8 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { | |||
|
|||
object SharedState { | |||
|
|||
URL.setURLStreamHandlerFactory(new SparkUrlStreamHandlerFactory()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO I think directly registering FsUrlStreamHandlerFactory
with URL#setURLStreamHandlerFactory
should be enough, it is not necessary to wrap again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
URL#setURLStreamHandlerFactory
can only be called once per JVM. If we use FsUrlStreamHandlerFactory
directly, we won't be able to support other factories. I wrapped it for future extendability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain more? I don't see specific difference about your changes compared to FsUrlStreamHandlerFactory
regarding called once per JVM
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can simply call URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory())
and everything works happily. The only problem is, URL.setURLStreamHandlerFactory
can only be called once per JVM (check javadoc of URL class), and if we want to support more stream handlers in the future, we wouldn't be able to call URL.setURLStreamHandlerFactory
again. It's like you only have one wall plug but you have several laptops and you'll have to use a power strip.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But with your wrap SparkUrlStreamHandlerFactory
, why it can be called more than once? I didn't get the point here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @vanzin @gatorsmile Could you please give some direction on which way to go? We need to support hdfs anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@weiqingy , even if you add a wrapper to try to support different stream handler factory. It is not a good idea to filter with only hdfs. FsUrlStreamHandlerFactory
by default supports different fs implementation, it is not necessary to check deliberately by the upstream code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could proxy to other URLStreamHandlerFactory
when FsUrlStreamHandlerFactory#createURLStreamHandler
returns null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What @jerryshao said.
But I also don't see the need to create any abstraction until it's necessary. So really there's no point in implementing it at this point. If you want to use the hypothetical argument of supporting a new FS, I'll give you the argument that such FS would be implemented as a FileSystem
and it would automatically hook up to FsUrlStreamHandlerFactory
, so no need to modify Spark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the comments. I have updated the PR.
def createURLStreamHandler(protocol: String): URLStreamHandler = { | ||
if (protocol.compareToIgnoreCase("hdfs") == 0) { | ||
if (hdfsHandler == null) { | ||
hdfsHandler = new FsUrlStreamHandlerFactory().createURLStreamHandler(protocol) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should call public FsUrlStreamHandlerFactory(Configuration conf)
this constructor, and use Configuration
created by SparkHadoopUtils
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I'll follow your suggestion.
Can you update the title to add |
CC @vanzin @tgravescs , can you please also review this PR? Thanks. |
Test build #75061 has finished for PR 17342 at commit
|
URL.setURLStreamHandlerFactory(new SparkUrlStreamHandlerFactory()) | ||
|
||
// if 'hdfs' is not supported, MalformedURLException will be thrown | ||
new URL("hdfs://docs.oracle.com/test.jar") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should check to see what happens when you run this test on a machine with no network connection. Everyone hates tests that fail when they rely on DNS working (or, in some cases, DNS not resolving an example.org domain)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test also works without network connection.
Test build #75885 has started for PR 17342 at commit |
Test build #75886 has started for PR 17342 at commit |
Test build #75887 has finished for PR 17342 at commit
|
The failures, I think, were not triggered by this code change. Will re-trigger Jenkins. |
Jenkins, test this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM aside from a minor comment.
@@ -146,6 +149,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { | |||
} | |||
|
|||
object SharedState { | |||
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if it's better to add a try..catch around this:
scala> URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory())
scala> URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory())
java.lang.Error: factory already defined
at java.net.URL.setURLStreamHandlerFactory(URL.java:1112)
... 48 elided
Normally this wouldn't matter, but if someone is messing with class loaders (e.g. running Spark embedded in a web app in a servlet container), they may run into situations where this code might run twice, or may even fail in the first time (if the user's application also installs a stream handler).
So I think it's safer to catch the error and print a warning message here. But really optimal would be if the "add jar" code didn't use URL at all for this. That's for a future change though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I have updated the PR. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @vanzin Could you please help to review this PR again? Thanks.
Test build #75905 has finished for PR 17342 at commit
|
Test build #75973 has started for PR 17342 at commit |
Test build #76000 has finished for PR 17342 at commit
|
|
||
// if 'hdfs' is not supported, MalformedURLException will be thrown | ||
new URL(jarFromHdfs) | ||
var exceptionThrown: Boolean = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace this whole block with:
intercept[MalformedURLException] {
new URL(jarFromInvalidFs)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. PR has been updated.
Test build #76128 has finished for PR 17342 at commit
|
Nobody else seems to have comments, so I'll merge to master / 2.2. |
## What changes were proposed in this pull request? Spark 2.2 is going to be cut, it'll be great if SPARK-12868 can be resolved before that. There have been several PRs for this like [PR#16324](#16324) , but all of them are inactivity for a long time or have been closed. This PR added a SparkUrlStreamHandlerFactory, which relies on 'protocol' to choose the appropriate UrlStreamHandlerFactory like FsUrlStreamHandlerFactory to create URLStreamHandler. ## How was this patch tested? 1. Add a new unit test. 2. Check manually. Before: throw an exception with " failed unknown protocol: hdfs" <img width="914" alt="screen shot 2017-03-17 at 9 07 36 pm" src="https://cloud.githubusercontent.com/assets/8546874/24075277/5abe0a7c-0bd5-11e7-900e-ec3d3105da0b.png"> After: <img width="1148" alt="screen shot 2017-03-18 at 11 42 18 am" src="https://cloud.githubusercontent.com/assets/8546874/24075283/69382a60-0bd5-11e7-8d30-d9405c3aaaba.png"> Author: Weiqing Yang <yangweiqing001@gmail.com> Closes #17342 from weiqingy/SPARK-18910. (cherry picked from commit 2ba1eba) Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
Thanks, @vanzin . |
Have u tried it in yarn-client mode? i add this path in v2.1.1 + Hadoop 2.6.0, when i run "add jar" through SparkSQL CLI , it comes out this error: |
I'm going to recommend you file a SPARK bug on issues.apache.org there & an HDFS linked to it "NPE in BlockReaderFactory log init". It looks like the creation of the LOG for BlockReader is triggering introspection which is triggering the BlockReaderFactory to do something before its fully inited, and then possibly NPE-ing as the LOG field is null. (It's due to Commons Logging examining all CP entries for commons logging config files, and the one for HDFS isn't yet functional, obviously). HDFS is all on SLF4J & may not have this issue -but if so, it's by accident not design. ps: thanks for the great stack trace! can you add it as the first comment in your bugreps, not in the actual report entry? That way every email we get on the JIRA update doesn't lose the conversation under the stack. Thanks |
@steveloughran Thanks Steve. |
Created: SPARK-21697 with the stack trace attached |
@steveloughran sorry for the delay,and very appreciate for creating this issueSPARK-21697 |
@Chopinxb no worries; the hard part is thinking how to fix this. I don't see it being possible to do reliably except through an explicit download. Hadoop 2.8+ has moved off commons-logging so this problem may have gone away. However, there are too many dependencies to be confident that will hold |
Patching this to spark-2.1.0 , had several issues .. |
At a guess, there's possibly a mix here between hadoop hdfs JARs on your classpath. You sure everything on the classpath is in sync? What JARs with hadoop-hdfs are there? |
Will check , But looks like its related to SPARK-21697 |
Hmmm.SPARK-21697 has a lot of the CP, but the problem in that one is some recursive loading of artifacts off HDFS, the can for commons-logging.properties being the troublespot. @rajeshcode , what you have seems more that a classic "class not found" problem, where one class is loading, but a dependency isn't being found. And as HDFS has moved its stuff around in a split from one hadoop-hdfs JAR into split client and server, that may be the cause. |
Sorry about above stack. The actual error is as below. Thats why relates to SPARK-21697 18/08/20 11:47:29 ERROR SparkSQLDriver: Failed in [select check('abc')] |
Well, no obvious answer there I'm afraid, except "don't put HDFS JARs on the classpath"; if you serve them up via HTTP all should work |
### What changes were proposed in this pull request? Add a property `spark.fsUrlStreamHandlerFactory.enabled` to allow users turn off the default registration of `org.apache.hadoop.fs.FsUrlStreamHandlerFactory` ### Why are the changes needed? This [SPARK-25694](https://issues.apache.org/jira/browse/SPARK-25694) is a long-standing issue. Originally, [[SPARK-12868][SQL] Allow adding jars from hdfs](#17342 ) added this for better Hive support. However, this have a side-effect when the users use Apache Spark without `-Phive`. This causes exceptions when the users tries to use another custom factories or 3rd party library (trying to set this). This configuration will unblock those non-hive users. ### Does this PR introduce any user-facing change? Yes. This provides a new user-configurable property. By default, the behavior is unchanged. ### How was this patch tested? Manual testing. **BEFORE** ``` $ build/sbt package $ bin/spark-shell scala> sql("show tables").show +--------+---------+-----------+ |database|tableName|isTemporary| +--------+---------+-----------+ +--------+---------+-----------+ scala> java.net.URL.setURLStreamHandlerFactory(new org.apache.hadoop.fs.FsUrlStreamHandlerFactory()) java.lang.Error: factory already defined at java.net.URL.setURLStreamHandlerFactory(URL.java:1134) ... 47 elided ``` **AFTER** ``` $ build/sbt package $ bin/spark-shell --conf spark.sql.defaultUrlStreamHandlerFactory.enabled=false scala> sql("show tables").show +--------+---------+-----------+ |database|tableName|isTemporary| +--------+---------+-----------+ +--------+---------+-----------+ scala> java.net.URL.setURLStreamHandlerFactory(new org.apache.hadoop.fs.FsUrlStreamHandlerFactory()) ``` Closes #26530 from jiangzho/master. Lead-authored-by: Zhou Jiang <zhou_jiang@apple.com> Co-authored-by: Dongjoon Hyun <dhyun@apple.com> Co-authored-by: zhou-jiang <zhou_jiang@apple.com> Signed-off-by: DB Tsai <d_tsai@apple.com>
Add a property `spark.fsUrlStreamHandlerFactory.enabled` to allow users turn off the default registration of `org.apache.hadoop.fs.FsUrlStreamHandlerFactory` This [SPARK-25694](https://issues.apache.org/jira/browse/SPARK-25694) is a long-standing issue. Originally, [[SPARK-12868][SQL] Allow adding jars from hdfs](#17342 ) added this for better Hive support. However, this have a side-effect when the users use Apache Spark without `-Phive`. This causes exceptions when the users tries to use another custom factories or 3rd party library (trying to set this). This configuration will unblock those non-hive users. Yes. This provides a new user-configurable property. By default, the behavior is unchanged. Manual testing. **BEFORE** ``` $ build/sbt package $ bin/spark-shell scala> sql("show tables").show +--------+---------+-----------+ |database|tableName|isTemporary| +--------+---------+-----------+ +--------+---------+-----------+ scala> java.net.URL.setURLStreamHandlerFactory(new org.apache.hadoop.fs.FsUrlStreamHandlerFactory()) java.lang.Error: factory already defined at java.net.URL.setURLStreamHandlerFactory(URL.java:1134) ... 47 elided ``` **AFTER** ``` $ build/sbt package $ bin/spark-shell --conf spark.sql.defaultUrlStreamHandlerFactory.enabled=false scala> sql("show tables").show +--------+---------+-----------+ |database|tableName|isTemporary| +--------+---------+-----------+ +--------+---------+-----------+ scala> java.net.URL.setURLStreamHandlerFactory(new org.apache.hadoop.fs.FsUrlStreamHandlerFactory()) ``` Closes #26530 from jiangzho/master. Lead-authored-by: Zhou Jiang <zhou_jiang@apple.com> Co-authored-by: Dongjoon Hyun <dhyun@apple.com> Co-authored-by: zhou-jiang <zhou_jiang@apple.com> Signed-off-by: DB Tsai <d_tsai@apple.com> (cherry picked from commit ee3bd6d) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Add a property `spark.fsUrlStreamHandlerFactory.enabled` to allow users turn off the default registration of `org.apache.hadoop.fs.FsUrlStreamHandlerFactory` This [SPARK-25694](https://issues.apache.org/jira/browse/SPARK-25694) is a long-standing issue. Originally, [[SPARK-12868][SQL] Allow adding jars from hdfs](apache#17342 ) added this for better Hive support. However, this have a side-effect when the users use Apache Spark without `-Phive`. This causes exceptions when the users tries to use another custom factories or 3rd party library (trying to set this). This configuration will unblock those non-hive users. Yes. This provides a new user-configurable property. By default, the behavior is unchanged. Manual testing. **BEFORE** ``` $ build/sbt package $ bin/spark-shell scala> sql("show tables").show +--------+---------+-----------+ |database|tableName|isTemporary| +--------+---------+-----------+ +--------+---------+-----------+ scala> java.net.URL.setURLStreamHandlerFactory(new org.apache.hadoop.fs.FsUrlStreamHandlerFactory()) java.lang.Error: factory already defined at java.net.URL.setURLStreamHandlerFactory(URL.java:1134) ... 47 elided ``` **AFTER** ``` $ build/sbt package $ bin/spark-shell --conf spark.sql.defaultUrlStreamHandlerFactory.enabled=false scala> sql("show tables").show +--------+---------+-----------+ |database|tableName|isTemporary| +--------+---------+-----------+ +--------+---------+-----------+ scala> java.net.URL.setURLStreamHandlerFactory(new org.apache.hadoop.fs.FsUrlStreamHandlerFactory()) ``` Closes apache#26530 from jiangzho/master. Lead-authored-by: Zhou Jiang <zhou_jiang@apple.com> Co-authored-by: Dongjoon Hyun <dhyun@apple.com> Co-authored-by: zhou-jiang <zhou_jiang@apple.com> Signed-off-by: DB Tsai <d_tsai@apple.com> (cherry picked from commit ee3bd6d) [SPARK-25694][SQL][FOLLOW-UP] Move 'spark.sql.defaultUrlStreamHandlerFactory.enabled' into StaticSQLConf.scala This PR is a followup of apache#26530 and proposes to move the configuration `spark.sql.defaultUrlStreamHandlerFactory.enabled` to `StaticSQLConf.scala` for consistency. To put the similar configurations together and for readability. No. Manually tested as described in apache#26530. Closes apache#26570 from HyukjinKwon/SPARK-25694. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org> (cherry picked from commit 8469614) RB=1906279 BUG=LIHADOOP-50577 G=superfriends-reviewers R=zolin,yezhou,chsingh,mshen,fli,latang A=chsingh
What changes were proposed in this pull request?
Spark 2.2 is going to be cut, it'll be great if SPARK-12868 can be resolved before that. There have been several PRs for this like PR#16324 , but all of them are inactivity for a long time or have been closed.
This PR added a SparkUrlStreamHandlerFactory, which relies on 'protocol' to choose the appropriate
UrlStreamHandlerFactory like FsUrlStreamHandlerFactory to create URLStreamHandler.
How was this patch tested?
Before: throw an exception with " failed unknown protocol: hdfs"
After:
