-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark: RewriteDatafilesAction V2 #2591
Conversation
@jackye1995 @rdblue @stevenzwu @chenjunjiedada Here is the big one, Implementation 95% there, tests are in progress |
@@ -71,8 +71,9 @@ | |||
* Method which will rewrite files based on this particular RewriteStrategy's algorithm. | |||
* This will most likely be Action framework specific (Spark/Presto/Flink ....). | |||
* | |||
* @param setId an identifier for this set of files |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are rewriting files in groups (if I'm not mistaken), would it make more sense to refer to this as a groupId
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure I was mostly copying the current usage in RewriteManager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is heading in the right direction. We should spend some time simplifying the execute
method. I think we can split it into a number of smaller methods.
api/src/main/java/org/apache/iceberg/actions/RewriteStrategy.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
spark3/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSpark3.java
Outdated
Show resolved
Hide resolved
import org.apache.iceberg.spark.actions.rewrite.Spark3BinPackStrategy; | ||
import org.apache.spark.sql.SparkSession; | ||
|
||
public class RewriteDataFilesSpark3 extends BaseRewriteDataFilesSparkAction { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BaseRewriteDataFilesSpark3Action
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can change this but is it a Base class if it's the leaf implementation? I know we have that on the other extensions so I'll do it here too.
spark3/src/main/java/org/apache/iceberg/spark/actions/rewrite/Spark3BinPackStrategy.java
Outdated
Show resolved
Hide resolved
spark3/src/main/java/org/apache/iceberg/spark/actions/rewrite/Spark3BinPackStrategy.java
Outdated
Show resolved
Hide resolved
spark3/src/main/java/org/apache/iceberg/spark/actions/rewrite/Spark3BinPackStrategy.java
Outdated
Show resolved
Hide resolved
spark3/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSpark3.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
spark3/src/main/java/org/apache/iceberg/spark/actions/rewrite/Spark3BinPackStrategy.java
Outdated
Show resolved
Hide resolved
a8cdeee
to
7efc295
Compare
spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
spark3/src/main/java/org/apache/iceberg/spark/actions/rewrite/Spark3BinPackStrategy.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
6f4e345
to
6634a51
Compare
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
spark3/src/main/java/org/apache/iceberg/spark/FileScanTaskSetManager.java
Outdated
Show resolved
Hide resolved
spark3/src/main/java/org/apache/iceberg/spark/FileScanTaskSetManager.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
0abcfbc
to
329747d
Compare
spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3BinPackStrategy.java
Show resolved
Hide resolved
680ca60
to
472afba
Compare
* in practice this is not the case. When we actually write our files, they may exceed the target | ||
* size and end up being split. This would end up producing 2 files out of one task, one target sized | ||
* and one very small file. Since the output file can vary in size, it is better to | ||
* use a slightly larger (but still within threshold) size for actually writing the tasks out. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this happen on all file formats? What file format did you use for this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least on Parquet, differences in compression and encoding seem to be issues here. @aokolnychyi has more info but one of the hypothesis was that smaller files used dictionary encoding while larger files did not.
Most of the experience with this is from production use-cases with users with large numbers of small files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the information! I was wondering whether this could happen to Avro as well since it is a row-based file format and it might not have the dictionary encoding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have seen this quite a lot when tiny Parquet files are compacted into larger ones as it changes the encoding on many columns. In most cases, the actual file size is bigger than what we estimated.
I am not sure about Avro. Cases where the estimation is precise enough should work as expected. The main cases we are trying to avoid is splitting 514 MB into 512 and 2 MB files and writing 1 GB files when the target is 512 MB.
The ideal solution is to know how much the remaining rows are going to cost us but we don't have that implemented yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rewrote the documentation here to hopefully make it more clear
core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitUtil.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java
Outdated
Show resolved
Hide resolved
manager.stageTasks(table, groupID, filesToRewrite); | ||
|
||
// Disable Adaptive Query Execution as this may change the output partitioning of our write | ||
SparkSession cloneSession = spark.cloneSession(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is safe? I thought sessions couldn't be closed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what you are asking here, this method's doc
Create an identical copy of this SparkSession, sharing the underlying SparkContext and shared state. All the state of this session (i.e. SQL configurations, temporary tables, registered functions) is copied over, and the cloned session is set up with the same shared state as this session. The cloned session is independent of this session, that is, any non-global change in either session is not reflected in the other.
We basically just use this make sure we don't change the user's spark session config
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm just a bit worried about creating session objects that never get cleaned up. This is a leak for anything calling this from a service, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Spark Session Objects? They should just get garbaged collected as soon as this function is done. Unless we are checking something else? It should be using sharedstate for everything that had a explicitly managed lifecycle
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I just looked into Spark a bit more and I don't see anything keeping track of sessions created this way. My comment was based on an issue about closing a session. Looks like it's okay to just let them go away, I guess?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I thought that old issue was about the coupling of SparkSession.close -> session.sparkContext.close
but I don't think there is any way to explicitly cause a session to disappear without just letting it get dereferenced. My understanding was the Session object is essentially just a thin wrapper around a context with it's own runtime conf (and shared state objects it shares with other sessions), so you should be free to just make a bunch of them. I think the only ones that never get dereferenced are the SparkSession.activeSession / defaultSession linked ones.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did look into the code a little bit. Seems the current assumption is that the state will be GCed once a session becomes unreachable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no other way to close a session so i'm not sure what else we can do, since session.close just closes the sparkcontext.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This uncertainty is why I don't usually create new sessions. But we'll see if it actually becomes a problem.
// Todo Add intelligence to the order in which we do rewrites instead of just using partition order | ||
return fileGroupsByPartition.entrySet().stream() | ||
.flatMap( | ||
e -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move e -> {
to the line above and then reformat the block below? Will reduce the indentation in the closure.
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
@@ -58,9 +60,19 @@ public void stageTasks(Table table, String setID, List<FileScanTask> tasks) { | |||
return tasksMap.remove(id); | |||
} | |||
|
|||
private Pair<String, String> toID(Table table, String setID) { | |||
public Set<String> fetchSetIDs(Table table) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this for testing? If so, can we make it package private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately this is iceberg.spark and the test checking this is in iceberg.actions
We have this issue where we have two different hierarchies for packaging actions which we may want to resolve latter but this has to be public for now for me to access it in the tests.
Table table = createTablePartitioned(4, 2); | ||
shouldHaveFiles(table, 8); | ||
List<Object[]> expectedRecords = currentData(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: extra newline.
@@ -93,7 +96,7 @@ public void commitRewrite(Table table, Set<String> fileSetIDs) { | |||
return Collections.unmodifiableSet(rewrittenDataFiles); | |||
} | |||
|
|||
private Set<DataFile> fetchNewDataFiles(Table table, Set<String> fileSetIDs) { | |||
public Set<DataFile> fetchNewDataFiles(Table table, Set<String> fileSetIDs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can now just accept a single file set id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have to delete code out of FileRewriteCoordinator to do this since the function is used in CommitOrRewrite. I can do this in the followup pr?
@@ -71,6 +72,8 @@ public void commitRewrite(Table table, Set<String> fileSetIDs) { | |||
table.newRewrite() | |||
.rewriteFiles(rewrittenDataFiles, newDataFiles) | |||
.commit(); | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to remove redundant logic from this class now. Can be done in a separate PR to simplify the review. We should probably rename it too as it is no longer a commit coordinator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can probably align the naming with the task set manager or whatever we call it on the read side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah i'll do another pr after this which prunes this class
spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3BinPackStrategy.java
Outdated
Show resolved
Hide resolved
@@ -114,6 +117,11 @@ public void commitRewrite(Table table, Set<String> fileSetIDs) { | |||
return newDataFiles; | |||
} | |||
|
|||
public void clearRewrite(Table table, String fileSetID) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't this class be used for any write that we want to hijack from Spark and commit later? It may be better to name it FileResultSetManager
or something.
@@ -71,6 +72,8 @@ public void commitRewrite(Table table, Set<String> fileSetIDs) { | |||
table.newRewrite() | |||
.rewriteFiles(rewrittenDataFiles, newDataFiles) | |||
.commit(); | |||
|
|||
fileSetIDs.stream().map(id -> toID(table, id)).forEach(resultMap::remove); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The commitRewrite
methods are only called from tests. Since this has changed quite a bit, have we looked to see what is still needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As noted above I think we'll do a separate PR to clean up this file (and remove unneeded tests) post merge.
core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesFileGroupInfo.java
Outdated
Show resolved
Hide resolved
cacheContents(table)); | ||
} | ||
|
||
protected <T> void shouldHaveLastCommitSorted(Table table, String column) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: the name of this function and then the language used in the code seem sort of unrelated. Is there a name for this that would more reflect the error message (and code)? Or perhaps a small doc comment might help to make this more clear to readers? Most things look great but I'm admittedly second guessing whether I understand why the function is named the way it is.
EDIT: Looking at the getOverlappingFiles function again, I still feel the name is a little confusing for me but no comment is needed. I think my mind just hung up on the usage of should
in this particular function name, but it's consistent with the other functions so I wouldn't personally change it. Feel free to close!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm really used to Scalatest where everything is "X should Y" so I tend to name my assertion things "should". This function is actually for the Sort Rewrite Action i'll be adding next to OSS. Basically makes sure that our "Sort" actually sorted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like how this style made the tests readable. I've been using "check" or "assert" in method names like this. Whatever makes a readable test method is good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should clarify, more my confusion was less with should
and more that it was shouldHaveLastCommitSorted
, but the assertion is Found overlapping files
.
I guess I would have expected shouldHaveNoOverlappingFiles
. or something.
I agree tha the should
etc makes it more readable though 👍
Refactors API to use List and includes info in RewriteResult
Dataset<Row> scanDF = cloneSession.read().format("iceberg") | ||
.option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID) | ||
.option(SparkReadOptions.SPLIT_SIZE, splitSize(inputFileSize(filesToRewrite))) | ||
.option(SparkReadOptions.FILE_OPEN_COST, "0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question for my own understanding: Why are you setting FILE_OPEN_COST
to zero here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So normally we are trying to weight the cost of opening a new file vs the cost of starting a new spark task. Opening files is expensive and sometimes it makes more sense to just start another task in parallel rather than waiting to open another file in serial within the same task. Imagine the case of many small files, if we pretend they are slightly larger with this value then we will bias our splits to having less small files in them which helps use make more spark tasks even if they read less data.
In this case we aren't optimizing for read performance, instead we are trying to make each Spark task as close to our target size as possible regardless of how many small files it takes. Here our goal is to write files of the correct size so the number of input files doesn't really matter. To do this we set the open cost to 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the details explanation!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me. Apart from minor nits, I think we should move RewriteStrategy
to core and clean up the commit coordinator in Spark (in a separate PR).
core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesFileGroupRewriteResult.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
manager.stageTasks(table, groupID, filesToRewrite); | ||
|
||
// Disable Adaptive Query Execution as this may change the output partitioning of our write | ||
SparkSession cloneSession = spark.cloneSession(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did look into the code a little bit. Seems the current assumption is that the state will be GCed once a session becomes unreachable?
Moved RewriteStrategy to Core Other renames
LOG.info("Creating commit service for table {} with {} groups per commit", table, rewritesPerCommit); | ||
this.rewritesPerCommit = rewritesPerCommit; | ||
|
||
committerService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: assignments to instance state without this.
prefix.
public void offer(RewriteFileGroup group) { | ||
LOG.debug("Offered to commit service: {}", group); | ||
Preconditions.checkState(running.get(), "Cannot add rewrites to a service which has already been closed"); | ||
completedRewrites.add(group); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's a race condition that could cause a group to get ignored:
- One thread enters this method and
running.get()
is true, but is then paused before adding the group. - Another thread calls close, setting
running
to false. - The service thread consumes the rest of
completedRewrites
and commits
I don't think this is likely to happen given the current use of this class. Plus, the commit in the service thread would take forever and the original thread calling offer
would almost certainly run. So maybe it isn't a real problem, but it seems like it would be a simple fix to use synchronized
block to guard completedRewrites
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah there is a race here which is why I put the precondition check in the bottom of the commit close function. Since this is internal (ish) I didn't want to add in a synchronize block when because any calls of "offer" after "close" are an error in our implementation. The precondition is more of a warning for us writing the code that the implementation of RewriteDatafilesAction is wrong. If we change the packaging of this we could make all of these package private :(
Thanks, @RussellSpitzer! I merged this. And thanks to everyone that helped review! |
Thanks everyone! |
Adds an implementation for Spark3 for performing Rewrites using the new
action api. Only implements for Spark3 at the moment with BinPack Strategy.