Skip to content

[SPARK-4079] [CORE] Consolidates Errors if a CompressionCodec is not available #3119

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from

Conversation

ksakellis
Copy link

This commit consolidates some of the exceptions thrown if compression codecs are not available. If a bad configuration string was passed in, a ClassNotFoundException was through. Also, if Snappy was not available, it would throw an InvocationTargetException when the codec was being used (not when it was being initialized). Now, an IllegalArgumentException is thrown when a codec is not available at creation time - either because the class does not exist or the codec itself is not available in the system. This will allow us to have a better message and fail faster.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

private val shortCompressionCodecNames = Map(
"lz4" -> classOf[LZ4CompressionCodec].getName,
"lzf" -> classOf[LZFCompressionCodec].getName,
"snappy" -> classOf[SnappyCompressionCodec].getName)

def createCodec(conf: SparkConf): CompressionCodec = {
createCodec(conf, conf.get("spark.io.compression.codec", DEFAULT_COMPRESSION_CODEC))
conf.getOption(configKey)
.map(createCodec(conf, _))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indented too far (Spark's convention is 2 spaces for the continuation line).

@vanzin
Copy link
Contributor

vanzin commented Nov 5, 2014

Just small nits. re: testing, can't you write a test where you set the system property to make snappy fail as part of that test?

createCodec(conf, conf.get("spark.io.compression.codec", DEFAULT_COMPRESSION_CODEC))
conf.getOption(configKey)
.map(createCodec(conf, _))
.orElse(createCodecFromName(conf, DEFAULT_COMPRESSION_CODEC))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I know I suggested this, but I think this doesn't do the same thing as the PR description says.

If the config option exists and the requested codec is not available, this will always try the default codecs. Instead, this should probably be something like:

if (conf.contains(configKey)) {
   createCodec(...).getOrElse(throw new Blah("requested codec cannot be loaded"))
} else
   createCodec(DEFAULT).orElse(createCodec(FALLBACK)).getOrElse(throw new Blah("cannot load defaults"))
}

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@pwendell
Copy link
Contributor

pwendell commented Nov 9, 2014

Could this instead just throw an exception when Snappy is configured but not supported? We typically try not to silently mutate configs in the background in favor of giving users an actionable exception. I think this could be accomplished by just modifying SnappyCompressionCodec to guard the creation of an input stream or output stream with a check as to whether Snappy is enabled, and throw an exception if it is not enabled.

The current approach could lead to very confusing failure behavior. For instance say a user has the Snappy native library installed on some machines but not others. What will happen is that there will be a stream corruption exception somewhere inside of Spark where one node writes data as Snappy and another reads it as LZF. And to figure out what caused this a user will have to troll through executor logs for a somewhat innocuous looking WARN statement.

@rxin designed this codec interface (I think) so maybe he has more comments also.

@rxin
Copy link
Contributor

rxin commented Nov 10, 2014

I agree with @pwendell - it'd be better to just fail fast here rather than picking LZF based on the the availability of the library.

@ksakellis
Copy link
Author

@vanzin regarding testing... yes, I did try what you recommended - I added a note in the commit message. Snappy does some static initialization so if it was unavailable when you first referenced the class, it will never be available again. So i can create a unit test but then all the other snappy unit tests fail. This is why i mentioned that I would need multiple classloaders to get around this issue.

@rxin
Copy link
Contributor

rxin commented Nov 10, 2014

BTW with sort based shuffle, it might be ok to roll back the default compression to lzf, since we no longer open a large number of concurrent streams. It is now only one per map task.

@ksakellis
Copy link
Author

@pwendell good point. I'll just make this throw an exception when the you try to create a SnappyCompressionCodec and snappy is not available. I think that is better than waiting until you create an input or output stream - fails faster.

@ksakellis ksakellis changed the title [SPARK-4079] [CORE] Default to LZF if Snappy not available [SPARK-4079] [CORE] Consolidates Errors if a CompressionCodec is not available Nov 13, 2014

private[spark] object CompressionCodec {
private[spark] object CompressionCodec extends Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extends Logging is not needed anymore?

@ksakellis
Copy link
Author

@pwendell Can you please trigger the jenkins tests for this pr?

@pwendell
Copy link
Contributor

Jenkins, test this please. Hm... they should be retriggering automatically.

@SparkQA
Copy link

SparkQA commented Nov 17, 2014

Test build #23505 has started for PR 3119 at commit 3847eef.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 17, 2014

Test build #23505 has finished for PR 3119 at commit 3847eef.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23505/
Test PASSed.

@ksakellis
Copy link
Author

@pwendell Can you please review? Is there anything else we need before we can merge?

@pwendell
Copy link
Contributor

It's not blocking 1.2 so I have other things on the plate. Should be able to get to it soon.


def isAvailable() : Boolean = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This trait is exposed so adding a method here will actually break binary compatibility with any user-defined compression codecs. We've usually tried not to do this in the past unless we had a situation where there was no work around, so what about just dealing with this directly inside the Snappy codec? I.e. just check when the snappy codec is instantiated and throw this exception if it's not available. It's slightly less general, but a simpler patch and we don't risk breaking user-defined codecs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way - the fact that you cannot safely add a method with a default implementation to a trait is not intuitive at all. You can checkout this article for more information why this is:

http://stackoverflow.com/questions/18366817/is-adding-a-trait-method-with-implementation-breaking-backward-compatibility

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh geez. thanks for the link. it makes sense but yes, not what you would expect from traits. I'll update the pr.

Kostas Sakellis added 4 commits December 1, 2014 11:04
By default, snappy is the compression codec used.
If Snappy is not available, Spark currently throws
a stack trace. Now Spark falls back to LZF
if Snappy is not available on the cluster and logs
a warning message.

The only exception is if the user has explicitly
set spark.io.compression.codec=snappy. In this
case, if snappy is not available, an
IllegalArgumentException is thrown.

Because of the way the Snappy library uses static
initialization, it was very difficult in a unit test to
simulate Snappy not being available. The only way I
could think of was to create multiple classloaders
which seemed excessive. As a result, most of this was tested
adhoc on a test cluster by modifying the system property:
org.xerial.snappy.use.systemlib=true which caused Snappy
to not load and thus triggering this logic.
Removed the fallback logic and now just throws an
IllegalArgumentException if you specify a bad codec
or one that is not available like Snappy.
Adds more information to the exception thrown
when Snappy is not available.
@@ -120,6 +129,12 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec {
@DeveloperApi
class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {

try {
Snappy.getNativeLibraryVersion
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we instead check for Snappy.isLoaded()? This way seems more roundabout.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pwendell can you point to the api you are talking about?
https://github.com/xerial/snappy-java/tree/1.1.1.6/src/main/java/org/xerial/snappy
I don't see a Snappy.isLoaded() API. There is a member variable in SnappyLoader but that is private.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pwendell
Copy link
Contributor

Current approach LGTM. Had just one small question.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants