Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark: RewriteDatafilesAction V2 #2591

Merged
merged 27 commits into from
Jul 11, 2021

Conversation

RussellSpitzer
Copy link
Member

Adds an implementation for Spark3 for performing Rewrites using the new
action api. Only implements for Spark3 at the moment with BinPack Strategy.

@RussellSpitzer
Copy link
Member Author

RussellSpitzer commented May 13, 2021

@jackye1995 @rdblue @stevenzwu @chenjunjiedada Here is the big one, Implementation 95% there, tests are in progress

@@ -71,8 +71,9 @@
* Method which will rewrite files based on this particular RewriteStrategy's algorithm.
* This will most likely be Action framework specific (Spark/Presto/Flink ....).
*
* @param setId an identifier for this set of files
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are rewriting files in groups (if I'm not mistaken), would it make more sense to refer to this as a groupId?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure I was mostly copying the current usage in RewriteManager

Copy link
Contributor

@aokolnychyi aokolnychyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is heading in the right direction. We should spend some time simplifying the execute method. I think we can split it into a number of smaller methods.

import org.apache.iceberg.spark.actions.rewrite.Spark3BinPackStrategy;
import org.apache.spark.sql.SparkSession;

public class RewriteDataFilesSpark3 extends BaseRewriteDataFilesSparkAction {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BaseRewriteDataFilesSpark3Action?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can change this but is it a Base class if it's the leaf implementation? I know we have that on the other extensions so I'll do it here too.

@RussellSpitzer RussellSpitzer force-pushed the SparkRewriteDataFiles branch from a8cdeee to 7efc295 Compare May 14, 2021 03:42
@RussellSpitzer RussellSpitzer force-pushed the SparkRewriteDataFiles branch from 6f4e345 to 6634a51 Compare May 18, 2021 19:58
@RussellSpitzer RussellSpitzer force-pushed the SparkRewriteDataFiles branch from 0abcfbc to 329747d Compare May 19, 2021 16:20
@RussellSpitzer RussellSpitzer force-pushed the SparkRewriteDataFiles branch from 680ca60 to 472afba Compare May 19, 2021 19:54
* in practice this is not the case. When we actually write our files, they may exceed the target
* size and end up being split. This would end up producing 2 files out of one task, one target sized
* and one very small file. Since the output file can vary in size, it is better to
* use a slightly larger (but still within threshold) size for actually writing the tasks out.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this happen on all file formats? What file format did you use for this test?

Copy link
Member Author

@RussellSpitzer RussellSpitzer May 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least on Parquet, differences in compression and encoding seem to be issues here. @aokolnychyi has more info but one of the hypothesis was that smaller files used dictionary encoding while larger files did not.

Most of the experience with this is from production use-cases with users with large numbers of small files.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the information! I was wondering whether this could happen to Avro as well since it is a row-based file format and it might not have the dictionary encoding.

Copy link
Contributor

@aokolnychyi aokolnychyi Jun 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have seen this quite a lot when tiny Parquet files are compacted into larger ones as it changes the encoding on many columns. In most cases, the actual file size is bigger than what we estimated.

I am not sure about Avro. Cases where the estimation is precise enough should work as expected. The main cases we are trying to avoid is splitting 514 MB into 512 and 2 MB files and writing 1 GB files when the target is 512 MB.

The ideal solution is to know how much the remaining rows are going to cost us but we don't have that implemented yet.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rewrote the documentation here to hopefully make it more clear

manager.stageTasks(table, groupID, filesToRewrite);

// Disable Adaptive Query Execution as this may change the output partitioning of our write
SparkSession cloneSession = spark.cloneSession();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is safe? I thought sessions couldn't be closed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you are asking here, this method's doc
Create an identical copy of this SparkSession, sharing the underlying SparkContext and shared state. All the state of this session (i.e. SQL configurations, temporary tables, registered functions) is copied over, and the cloned session is set up with the same shared state as this session. The cloned session is independent of this session, that is, any non-global change in either session is not reflected in the other.

We basically just use this make sure we don't change the user's spark session config

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just a bit worried about creating session objects that never get cleaned up. This is a leak for anything calling this from a service, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Spark Session Objects? They should just get garbaged collected as soon as this function is done. Unless we are checking something else? It should be using sharedstate for everything that had a explicitly managed lifecycle

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I just looked into Spark a bit more and I don't see anything keeping track of sessions created this way. My comment was based on an issue about closing a session. Looks like it's okay to just let them go away, I guess?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I thought that old issue was about the coupling of SparkSession.close -> session.sparkContext.close but I don't think there is any way to explicitly cause a session to disappear without just letting it get dereferenced. My understanding was the Session object is essentially just a thin wrapper around a context with it's own runtime conf (and shared state objects it shares with other sessions), so you should be free to just make a bunch of them. I think the only ones that never get dereferenced are the SparkSession.activeSession / defaultSession linked ones.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did look into the code a little bit. Seems the current assumption is that the state will be GCed once a session becomes unreachable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no other way to close a session so i'm not sure what else we can do, since session.close just closes the sparkcontext.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This uncertainty is why I don't usually create new sessions. But we'll see if it actually becomes a problem.

// Todo Add intelligence to the order in which we do rewrites instead of just using partition order
return fileGroupsByPartition.entrySet().stream()
.flatMap(
e -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move e -> { to the line above and then reformat the block below? Will reduce the indentation in the closure.

@@ -58,9 +60,19 @@ public void stageTasks(Table table, String setID, List<FileScanTask> tasks) {
return tasksMap.remove(id);
}

private Pair<String, String> toID(Table table, String setID) {
public Set<String> fetchSetIDs(Table table) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this for testing? If so, can we make it package private?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately this is iceberg.spark and the test checking this is in iceberg.actions

We have this issue where we have two different hierarchies for packaging actions which we may want to resolve latter but this has to be public for now for me to access it in the tests.

Table table = createTablePartitioned(4, 2);
shouldHaveFiles(table, 8);
List<Object[]> expectedRecords = currentData();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: extra newline.

@@ -93,7 +96,7 @@ public void commitRewrite(Table table, Set<String> fileSetIDs) {
return Collections.unmodifiableSet(rewrittenDataFiles);
}

private Set<DataFile> fetchNewDataFiles(Table table, Set<String> fileSetIDs) {
public Set<DataFile> fetchNewDataFiles(Table table, Set<String> fileSetIDs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can now just accept a single file set id.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to delete code out of FileRewriteCoordinator to do this since the function is used in CommitOrRewrite. I can do this in the followup pr?

@@ -71,6 +72,8 @@ public void commitRewrite(Table table, Set<String> fileSetIDs) {
table.newRewrite()
.rewriteFiles(rewrittenDataFiles, newDataFiles)
.commit();

Copy link
Contributor

@aokolnychyi aokolnychyi Jul 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to remove redundant logic from this class now. Can be done in a separate PR to simplify the review. We should probably rename it too as it is no longer a commit coordinator.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably align the naming with the task set manager or whatever we call it on the read side.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah i'll do another pr after this which prunes this class

@@ -114,6 +117,11 @@ public void commitRewrite(Table table, Set<String> fileSetIDs) {
return newDataFiles;
}

public void clearRewrite(Table table, String fileSetID) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't this class be used for any write that we want to hijack from Spark and commit later? It may be better to name it FileResultSetManager or something.

@@ -71,6 +72,8 @@ public void commitRewrite(Table table, Set<String> fileSetIDs) {
table.newRewrite()
.rewriteFiles(rewrittenDataFiles, newDataFiles)
.commit();

fileSetIDs.stream().map(id -> toID(table, id)).forEach(resultMap::remove);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The commitRewrite methods are only called from tests. Since this has changed quite a bit, have we looked to see what is still needed?

Copy link
Member Author

@RussellSpitzer RussellSpitzer Jul 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As noted above I think we'll do a separate PR to clean up this file (and remove unneeded tests) post merge.

cacheContents(table));
}

protected <T> void shouldHaveLastCommitSorted(Table table, String column) {
Copy link
Contributor

@kbendick kbendick Jul 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the name of this function and then the language used in the code seem sort of unrelated. Is there a name for this that would more reflect the error message (and code)? Or perhaps a small doc comment might help to make this more clear to readers? Most things look great but I'm admittedly second guessing whether I understand why the function is named the way it is.

EDIT: Looking at the getOverlappingFiles function again, I still feel the name is a little confusing for me but no comment is needed. I think my mind just hung up on the usage of should in this particular function name, but it's consistent with the other functions so I wouldn't personally change it. Feel free to close!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm really used to Scalatest where everything is "X should Y" so I tend to name my assertion things "should". This function is actually for the Sort Rewrite Action i'll be adding next to OSS. Basically makes sure that our "Sort" actually sorted.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like how this style made the tests readable. I've been using "check" or "assert" in method names like this. Whatever makes a readable test method is good.

Copy link
Contributor

@kbendick kbendick Jul 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should clarify, more my confusion was less with should and more that it was shouldHaveLastCommitSorted, but the assertion is Found overlapping files.

I guess I would have expected shouldHaveNoOverlappingFiles. or something.

I agree tha the should etc makes it more readable though 👍

Refactors API to use List and includes info in RewriteResult
Dataset<Row> scanDF = cloneSession.read().format("iceberg")
.option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
.option(SparkReadOptions.SPLIT_SIZE, splitSize(inputFileSize(filesToRewrite)))
.option(SparkReadOptions.FILE_OPEN_COST, "0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question for my own understanding: Why are you setting FILE_OPEN_COST to zero here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So normally we are trying to weight the cost of opening a new file vs the cost of starting a new spark task. Opening files is expensive and sometimes it makes more sense to just start another task in parallel rather than waiting to open another file in serial within the same task. Imagine the case of many small files, if we pretend they are slightly larger with this value then we will bias our splits to having less small files in them which helps use make more spark tasks even if they read less data.

In this case we aren't optimizing for read performance, instead we are trying to make each Spark task as close to our target size as possible regardless of how many small files it takes. Here our goal is to write files of the correct size so the number of input files doesn't really matter. To do this we set the open cost to 0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the details explanation!

Copy link
Contributor

@aokolnychyi aokolnychyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me. Apart from minor nits, I think we should move RewriteStrategy to core and clean up the commit coordinator in Spark (in a separate PR).

manager.stageTasks(table, groupID, filesToRewrite);

// Disable Adaptive Query Execution as this may change the output partitioning of our write
SparkSession cloneSession = spark.cloneSession();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did look into the code a little bit. Seems the current assumption is that the state will be GCed once a session becomes unreachable?

Moved RewriteStrategy to Core
Other renames
LOG.info("Creating commit service for table {} with {} groups per commit", table, rewritesPerCommit);
this.rewritesPerCommit = rewritesPerCommit;

committerService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: assignments to instance state without this. prefix.

public void offer(RewriteFileGroup group) {
LOG.debug("Offered to commit service: {}", group);
Preconditions.checkState(running.get(), "Cannot add rewrites to a service which has already been closed");
completedRewrites.add(group);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a race condition that could cause a group to get ignored:

  • One thread enters this method and running.get() is true, but is then paused before adding the group.
  • Another thread calls close, setting running to false.
  • The service thread consumes the rest of completedRewrites and commits

I don't think this is likely to happen given the current use of this class. Plus, the commit in the service thread would take forever and the original thread calling offer would almost certainly run. So maybe it isn't a real problem, but it seems like it would be a simple fix to use synchronized block to guard completedRewrites.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah there is a race here which is why I put the precondition check in the bottom of the commit close function. Since this is internal (ish) I didn't want to add in a synchronize block when because any calls of "offer" after "close" are an error in our implementation. The precondition is more of a warning for us writing the code that the implementation of RewriteDatafilesAction is wrong. If we change the packaging of this we could make all of these package private :(

@rdblue rdblue merged commit 25eaeba into apache:master Jul 11, 2021
@rdblue
Copy link
Contributor

rdblue commented Jul 12, 2021

Thanks, @RussellSpitzer! I merged this. And thanks to everyone that helped review!

@RussellSpitzer
Copy link
Member Author

Thanks everyone!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants