Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark: Fix a separate table cache being created for each rewriteFiles #5392

Merged
merged 7 commits into from
Nov 27, 2022

Conversation

manuzhang
Copy link
Collaborator

Currently, during Spark's rewrite data files procedure with bin pack strategy, SparkSession is cloned to disable AQE in each rewriteFiles. Since a cloned SparkSession has its own state, V2SessionCatalog is reloaded every time and a separate table cache is created. That means each file group has its own table cache and effectively disables the table cache.

This PR fixes it by cloning SparkSession when creating SparkBinPackStrategy.

@github-actions github-actions bot added the spark label Jul 30, 2022
@manuzhang
Copy link
Collaborator Author

manuzhang commented Jul 30, 2022

@rdblue @aokolnychyi @kbendick Please help review and suggest where to add a UT. I haven't found a proper place.

@manuzhang manuzhang force-pushed the fix-table-caching branch from 918bb15 to d756a4f Compare July 30, 2022 10:38
@rdblue
Copy link
Contributor

rdblue commented Jul 30, 2022

I'm interested to hear what @szehon-ho and @RussellSpitzer think about this.

My initial reaction is that this is not something that we should change. We don't want to disable AQE for other Spark work, which is a side-effect of this change. I also don't like that we need to create a new Spark session for each rewrite, but I don't think there is much we can do to avoid it if we want to disable AQE. We could also fail if AQE is on or just accept the AQE results.

Also, is a separate table cache a bug? Since it is only used once, what is the problem with doing it this way? Sure, this won't cache the rewritten table, but is there a behavior problem or are loads just slightly slower?

@manuzhang
Copy link
Collaborator Author

manuzhang commented Jul 31, 2022

@rdblue I've moved cloning session and disabling AQE into RewriteDataFilesSparkAction.

When multiple rewrite actions are submitted concurrently as follows, they will block each other when loading table due to locks in shared HiveExternalCatalog. That hurts the overall performance of rewrite actions.

CALL spark_catalog.system.rewrite_data_files(table => 'default.table', options => map('max-concurrent-file-group-rewrites', '200'));

@manuzhang
Copy link
Collaborator Author

There is another lock in SessionCatalog#tableExists for Spark versions before apache/spark#31891 since sessionCatalog is also shared.

@kbendick
Copy link
Contributor

Ignore my previous comments I had the caches mistaken.

@kbendick
Copy link
Contributor

I will say that I don't love the idea of asking users to disable AQE (e.g. not cloning the session and ensuring AQE is disabled), as much as cloning the session is somewhat of a pain.

People use these statements at the end of queries and disabling AQE would be a bummer.

// Disable Adaptive Query Execution as this may change the output partitioning of our write
SparkSession spark = spark().cloneSession();
spark.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);
return new SparkBinPackStrategy(table, spark);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bin packing is used as the basis for the other strategies. If you're moving this into the action itself, then you should move it into spark().

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's moved into spark() then SparkSession is cloned with AQE disabled for all BaseSparkActions. I'm not sure about the side-effect.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that spark() is protected in BaseSparkAction, it could be overridden within this action.

Or another spark() method could be added that clones and disables AQE, like spark(boolean cloneAndDisableAQE) or something.

Copy link
Collaborator Author

@manuzhang manuzhang Aug 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the difference here since we need to invoke the spark() from super class to get SparkSession anyway.

@rdblue
Copy link
Contributor

rdblue commented Jul 31, 2022

@manuzhang, it seems reasonable to create a session for the entire rewrite, not just each Spark submission. Is that what was happening before?

@manuzhang
Copy link
Collaborator Author

Is that what was happening before?

Do you mean session was created for the entire rewrite before?

@rdblue
Copy link
Contributor

rdblue commented Aug 1, 2022

Do you mean session was created for the entire rewrite before?

I'm asking you what the behavior was before this change that you want to fix.

@manuzhang
Copy link
Collaborator Author

Not sure. The furthest I can track is #2591 and the behavior is same as now.

@manuzhang
Copy link
Collaborator Author

@rdblue Any more concerns or suggestions for this PR?

@manuzhang
Copy link
Collaborator Author

@rdblue and @kbendick any more comments?

@manuzhang
Copy link
Collaborator Author

Gentle ping @rdblue @aokolnychyi @kbendick for another review

Copy link
Member

@ajantha-bhat ajantha-bhat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.
Thanks for addressing the comments and for the fix.

@Fokko, @rdblue, @aokolnychyi, @szehon-ho, @danielcweeks : Please help in review/merge.

@@ -185,6 +186,14 @@ public RewriteDataFiles.Result execute() {
}
}

@Override
protected SparkSession spark() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may only be called once right now, but I think we should not assume that it always will be. Can you update this to keep a copy of the session for this action?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean by "copy". It is now cloning the SparkSession for this action on binPackStrategy, sortStrategy and zOrderStrategy

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what he meant is that, have a local variable of SparkSession in RewriteDataFilesSparkAction, initialize it once by cloning the session and disabling adaptive encoding.

Whenever spark() is called, return that variable instead of cloning again and again.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. It looks there's no need for overriding this method but simply cloning in the constructor of RewriteDataFilesSparkAction

@manuzhang
Copy link
Collaborator Author

@ajantha-bhat I forgot to remove cloneSession in SparkSortStrategy and SparkZOrderStrategy. Please review again.

SparkSession cloneSession = spark().cloneSession();
cloneSession.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);

SparkSession spark = spark();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of cloning again, Aren't we suppose to store spark coming in the constructor into a variable and use it here?

Copy link
Collaborator Author

@manuzhang manuzhang Oct 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not cloning again but spark() from SparkSortStrategy

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. This similar method name is really confusing. Just by reading this block of the code couldn't figure out spark() was also there in SparkSortStrategy and it is not from RewriteDataFilesSparkAction

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add a comment that it is from the parent or rename the method name in the parent.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'm wondering whether spark can be a protected field. The current way is a bit overuse of design patterns, IMO.

Copy link
Member

@RussellSpitzer RussellSpitzer Nov 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our current style rules don't allow for protected fields, but we could make an exception here I guess. I think long term we imagined that ZOrderStrategy will just be a special case of SortStrategy once we have the abilty to use a multi arg transform as a Sort Order.

@ajantha-bhat
Copy link
Member

@ajantha-bhat I forgot to remove cloneSession in SparkSortStrategy and SparkZOrderStrategy. Please review again.

Yeah, last time I gave a comment that these other places also have issues and I forgot to recheck😔 whether it was addressed or not. I think now we cover all the places. Thanks.

@manuzhang
Copy link
Collaborator Author

@ajantha-bhat @rdblue any more comments?

// Reset Shuffle Partitions for our sort
long numOutputFiles =
numOutputFiles((long) (inputFileSize(filesToRewrite) * sizeEstimateMultiple));
cloneSession.conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), Math.max(1, numOutputFiles));
spark.conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), Math.max(1, numOutputFiles));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little worried that this may be a race condition between various rewrite actions occurring at the same time in the same JVM

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I think this is fine since each Action get's it's own cloned session

@@ -94,10 +95,14 @@
private boolean useStartingSequenceNumber;
private RewriteJobOrder rewriteJobOrder;
private RewriteStrategy strategy = null;
private final SparkSession cloneSession;

RewriteDataFilesSparkAction(SparkSession spark, Table table) {
super(spark);
Copy link
Member

@RussellSpitzer RussellSpitzer Nov 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to be the cloned session, otherwise the subclass calls would be modifying the original session and not the new cloned one correct?

Like here -
https://github.com/apache/iceberg/pull/5392/files#diff-39b303771b5d730c63672bb27597474e2b84a7f1b4b3f8b22fb58352c34f8968R206

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. In general, I think this should probably be overriding spark() so that the same context is used everywhere consistently.

@RussellSpitzer
Copy link
Member

@manuzhang, it seems reasonable to create a session for the entire rewrite, not just each Spark submission. Is that what was happening before?

Yes basically the old behavior would be to clone the Spark Session for each file group rewrite, rather than once for the entire action.

Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall I think this is the right approach, if we are going to be modifying session state for a given action I think i makes sense to clone at the action level rather than within the rewriteFiles call.

That said I do think we have to be careful that the return of spark() is the cloned session and not the original. Once that is fixed I think this is good to go.

@manuzhang
Copy link
Collaborator Author

@RussellSpitzer please check again whether it's fixed now.

@manuzhang
Copy link
Collaborator Author

@RussellSpitzer @rdblue @ajantha-bhat please take another look. Thanks.

@rdblue
Copy link
Contributor

rdblue commented Nov 27, 2022

I think that all of the session references are to the cloned Spark session now. +1.

@rdblue rdblue merged commit 4caf1b4 into apache:master Nov 27, 2022
@rdblue
Copy link
Contributor

rdblue commented Nov 27, 2022

Thanks, @manuzhang!

@manuzhang
Copy link
Collaborator Author

Thanks @rdblue @ajantha-bhat @RussellSpitzer @kbendick @hililiwei for your review.
I opened #6284 and #6285 to back-port the PR to Spark 3.2 and 3.1 respectively. Please help review those PRs as well.

@manuzhang manuzhang deleted the fix-table-caching branch March 26, 2024 15:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants