-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-4079] [CORE] Consolidates Errors if a CompressionCodec is not available #3119
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Can one of the admins verify this patch? |
private val shortCompressionCodecNames = Map( | ||
"lz4" -> classOf[LZ4CompressionCodec].getName, | ||
"lzf" -> classOf[LZFCompressionCodec].getName, | ||
"snappy" -> classOf[SnappyCompressionCodec].getName) | ||
|
||
def createCodec(conf: SparkConf): CompressionCodec = { | ||
createCodec(conf, conf.get("spark.io.compression.codec", DEFAULT_COMPRESSION_CODEC)) | ||
conf.getOption(configKey) | ||
.map(createCodec(conf, _)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indented too far (Spark's convention is 2 spaces for the continuation line).
Just small nits. re: testing, can't you write a test where you set the system property to make snappy fail as part of that test? |
createCodec(conf, conf.get("spark.io.compression.codec", DEFAULT_COMPRESSION_CODEC)) | ||
conf.getOption(configKey) | ||
.map(createCodec(conf, _)) | ||
.orElse(createCodecFromName(conf, DEFAULT_COMPRESSION_CODEC)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, I know I suggested this, but I think this doesn't do the same thing as the PR description says.
If the config option exists and the requested codec is not available, this will always try the default codecs. Instead, this should probably be something like:
if (conf.contains(configKey)) {
createCodec(...).getOrElse(throw new Blah("requested codec cannot be loaded"))
} else
createCodec(DEFAULT).orElse(createCodec(FALLBACK)).getOrElse(throw new Blah("cannot load defaults"))
}
Can one of the admins verify this patch? |
Could this instead just throw an exception when Snappy is configured but not supported? We typically try not to silently mutate configs in the background in favor of giving users an actionable exception. I think this could be accomplished by just modifying The current approach could lead to very confusing failure behavior. For instance say a user has the Snappy native library installed on some machines but not others. What will happen is that there will be a stream corruption exception somewhere inside of Spark where one node writes data as Snappy and another reads it as LZF. And to figure out what caused this a user will have to troll through executor logs for a somewhat innocuous looking @rxin designed this codec interface (I think) so maybe he has more comments also. |
I agree with @pwendell - it'd be better to just fail fast here rather than picking LZF based on the the availability of the library. |
@vanzin regarding testing... yes, I did try what you recommended - I added a note in the commit message. Snappy does some static initialization so if it was unavailable when you first referenced the class, it will never be available again. So i can create a unit test but then all the other snappy unit tests fail. This is why i mentioned that I would need multiple classloaders to get around this issue. |
BTW with sort based shuffle, it might be ok to roll back the default compression to lzf, since we no longer open a large number of concurrent streams. It is now only one per map task. |
@pwendell good point. I'll just make this throw an exception when the you try to create a SnappyCompressionCodec and snappy is not available. I think that is better than waiting until you create an input or output stream - fails faster. |
|
||
private[spark] object CompressionCodec { | ||
private[spark] object CompressionCodec extends Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extends Logging
is not needed anymore?
6d3d79d
to
3847eef
Compare
@pwendell Can you please trigger the jenkins tests for this pr? |
Jenkins, test this please. Hm... they should be retriggering automatically. |
Test build #23505 has started for PR 3119 at commit
|
Test build #23505 has finished for PR 3119 at commit
|
Test PASSed. |
@pwendell Can you please review? Is there anything else we need before we can merge? |
It's not blocking 1.2 so I have other things on the plate. Should be able to get to it soon. |
|
||
def isAvailable() : Boolean = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This trait is exposed so adding a method here will actually break binary compatibility with any user-defined compression codecs. We've usually tried not to do this in the past unless we had a situation where there was no work around, so what about just dealing with this directly inside the Snappy codec? I.e. just check when the snappy codec is instantiated and throw this exception if it's not available. It's slightly less general, but a simpler patch and we don't risk breaking user-defined codecs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By the way - the fact that you cannot safely add a method with a default implementation to a trait is not intuitive at all. You can checkout this article for more information why this is:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh geez. thanks for the link. it makes sense but yes, not what you would expect from traits. I'll update the pr.
By default, snappy is the compression codec used. If Snappy is not available, Spark currently throws a stack trace. Now Spark falls back to LZF if Snappy is not available on the cluster and logs a warning message. The only exception is if the user has explicitly set spark.io.compression.codec=snappy. In this case, if snappy is not available, an IllegalArgumentException is thrown. Because of the way the Snappy library uses static initialization, it was very difficult in a unit test to simulate Snappy not being available. The only way I could think of was to create multiple classloaders which seemed excessive. As a result, most of this was tested adhoc on a test cluster by modifying the system property: org.xerial.snappy.use.systemlib=true which caused Snappy to not load and thus triggering this logic.
Removed the fallback logic and now just throws an IllegalArgumentException if you specify a bad codec or one that is not available like Snappy.
Adds more information to the exception thrown when Snappy is not available.
2e87672
to
63bfdd0
Compare
@@ -120,6 +129,12 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec { | |||
@DeveloperApi | |||
class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec { | |||
|
|||
try { | |||
Snappy.getNativeLibraryVersion |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we instead check for Snappy.isLoaded()
? This way seems more roundabout.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pwendell can you point to the api you are talking about?
https://github.com/xerial/snappy-java/tree/1.1.1.6/src/main/java/org/xerial/snappy
I don't see a Snappy.isLoaded() API. There is a member variable in SnappyLoader but that is private.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about this, but it's private: https://github.com/xerial/snappy-java/blob/master/src/main/java/org/xerial/snappy/SnappyLoader.java#L83
Current approach LGTM. Had just one small question. |
This commit consolidates some of the exceptions thrown if compression codecs are not available. If a bad configuration string was passed in, a ClassNotFoundException was through. Also, if Snappy was not available, it would throw an InvocationTargetException when the codec was being used (not when it was being initialized). Now, an IllegalArgumentException is thrown when a codec is not available at creation time - either because the class does not exist or the codec itself is not available in the system. This will allow us to have a better message and fail faster.