-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark: Fix a separate table cache being created for each rewriteFiles #5392
Conversation
@rdblue @aokolnychyi @kbendick Please help review and suggest where to add a UT. I haven't found a proper place. |
918bb15
to
d756a4f
Compare
I'm interested to hear what @szehon-ho and @RussellSpitzer think about this. My initial reaction is that this is not something that we should change. We don't want to disable AQE for other Spark work, which is a side-effect of this change. I also don't like that we need to create a new Spark session for each rewrite, but I don't think there is much we can do to avoid it if we want to disable AQE. We could also fail if AQE is on or just accept the AQE results. Also, is a separate table cache a bug? Since it is only used once, what is the problem with doing it this way? Sure, this won't cache the rewritten table, but is there a behavior problem or are loads just slightly slower? |
@rdblue I've moved cloning session and disabling AQE into When multiple rewrite actions are submitted concurrently as follows, they will block each other when loading table due to locks in shared
|
There is another lock in |
Ignore my previous comments I had the caches mistaken. |
I will say that I don't love the idea of asking users to disable AQE (e.g. not cloning the session and ensuring AQE is disabled), as much as cloning the session is somewhat of a pain. People use these statements at the end of queries and disabling AQE would be a bummer. |
// Disable Adaptive Query Execution as this may change the output partitioning of our write | ||
SparkSession spark = spark().cloneSession(); | ||
spark.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false); | ||
return new SparkBinPackStrategy(table, spark); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bin packing is used as the basis for the other strategies. If you're moving this into the action itself, then you should move it into spark()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's moved into spark()
then SparkSession is cloned with AQE disabled for all BaseSparkAction
s. I'm not sure about the side-effect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that spark()
is protected in BaseSparkAction
, it could be overridden within this action.
Or another spark()
method could be added that clones and disables AQE, like spark(boolean cloneAndDisableAQE)
or something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see the difference here since we need to invoke the spark()
from super class to get SparkSession anyway.
@manuzhang, it seems reasonable to create a session for the entire rewrite, not just each Spark submission. Is that what was happening before? |
Do you mean session was created for the entire rewrite before? |
I'm asking you what the behavior was before this change that you want to fix. |
Not sure. The furthest I can track is #2591 and the behavior is same as now. |
@rdblue Any more concerns or suggestions for this PR? |
Gentle ping @rdblue @aokolnychyi @kbendick for another review |
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
Show resolved
Hide resolved
96f4c9b
to
905d2c5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Thanks for addressing the comments and for the fix.
@Fokko, @rdblue, @aokolnychyi, @szehon-ho, @danielcweeks : Please help in review/merge.
@@ -185,6 +186,14 @@ public RewriteDataFiles.Result execute() { | |||
} | |||
} | |||
|
|||
@Override | |||
protected SparkSession spark() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may only be called once right now, but I think we should not assume that it always will be. Can you update this to keep a copy of the session for this action?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what you mean by "copy". It is now cloning the SparkSession for this action on binPackStrategy
, sortStrategy
and zOrderStrategy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think what he meant is that, have a local variable of SparkSession
in RewriteDataFilesSparkAction
, initialize it once by cloning the session and disabling adaptive encoding.
Whenever spark() is called, return that variable instead of cloning again and again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. It looks there's no need for overriding this method but simply cloning in the constructor of RewriteDataFilesSparkAction
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
Show resolved
Hide resolved
@ajantha-bhat I forgot to remove cloneSession in |
SparkSession cloneSession = spark().cloneSession(); | ||
cloneSession.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false); | ||
|
||
SparkSession spark = spark(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of cloning again, Aren't we suppose to store spark
coming in the constructor into a variable and use it here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not cloning again but spark()
from SparkSortStrategy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. This similar method name is really confusing. Just by reading this block of the code couldn't figure out spark() was also there in SparkSortStrategy
and it is not from RewriteDataFilesSparkAction
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can add a comment that it is from the parent or rename the method name in the parent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I'm wondering whether spark
can be a protected field. The current way is a bit overuse of design patterns, IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our current style rules don't allow for protected fields, but we could make an exception here I guess. I think long term we imagined that ZOrderStrategy will just be a special case of SortStrategy once we have the abilty to use a multi arg transform as a Sort Order.
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
Outdated
Show resolved
Hide resolved
Yeah, last time I gave a comment that these other places also have issues and I forgot to recheck😔 whether it was addressed or not. I think now we cover all the places. Thanks. |
@ajantha-bhat @rdblue any more comments? |
// Reset Shuffle Partitions for our sort | ||
long numOutputFiles = | ||
numOutputFiles((long) (inputFileSize(filesToRewrite) * sizeEstimateMultiple)); | ||
cloneSession.conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), Math.max(1, numOutputFiles)); | ||
spark.conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), Math.max(1, numOutputFiles)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little worried that this may be a race condition between various rewrite actions occurring at the same time in the same JVM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I think this is fine since each Action get's it's own cloned session
@@ -94,10 +95,14 @@ | |||
private boolean useStartingSequenceNumber; | |||
private RewriteJobOrder rewriteJobOrder; | |||
private RewriteStrategy strategy = null; | |||
private final SparkSession cloneSession; | |||
|
|||
RewriteDataFilesSparkAction(SparkSession spark, Table table) { | |||
super(spark); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs to be the cloned session, otherwise the subclass calls would be modifying the original session and not the new cloned one correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. In general, I think this should probably be overriding spark()
so that the same context is used everywhere consistently.
Yes basically the old behavior would be to clone the Spark Session for each file group rewrite, rather than once for the entire action. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall I think this is the right approach, if we are going to be modifying session state for a given action I think i makes sense to clone at the action level rather than within the rewriteFiles call.
That said I do think we have to be careful that the return of spark() is the cloned session and not the original. Once that is fixed I think this is good to go.
a3aab20
to
870d04c
Compare
@RussellSpitzer please check again whether it's fixed now. |
@RussellSpitzer @rdblue @ajantha-bhat please take another look. Thanks. |
I think that all of the session references are to the cloned Spark session now. +1. |
Thanks, @manuzhang! |
Thanks @rdblue @ajantha-bhat @RussellSpitzer @kbendick @hililiwei for your review. |
Currently, during Spark's rewrite data files procedure with bin pack strategy,
SparkSession
is cloned to disable AQE in eachrewriteFiles
. Since a clonedSparkSession
has its own state, V2SessionCatalog is reloaded every time and a separate table cache is created. That means each file group has its own table cache and effectively disables the table cache.This PR fixes it by cloning
SparkSession
when creatingSparkBinPackStrategy
.