Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hive: Implement multi-table inserts #2228

Merged
merged 9 commits into from
Mar 30, 2021
Merged

Hive: Implement multi-table inserts #2228

merged 9 commits into from
Mar 30, 2021

Conversation

pvary
Copy link
Contributor

@pvary pvary commented Feb 8, 2021

When the insert statement adds data to multiple tables the current OutputCommitter implementation fails to handle the situation.
Example query:

FROM customers
    INSERT INTO target1 SELECT *
    INSERT INTO target2 SELECT *

The change contains the following modifications:

  • We have to handle multiple table writers for a single TaskAttempt.
  • When committing the task we have to create a forCommit file for every target table
    • For this we have to collect the target table names and locations when creating the jobConf
  • When committing the job we have to commit all of the tables
  • When aborting the job we have to clean up every target table directory
  • When cleaning up after a job we have to do it for every table
  • Added a test for multi table insert

@github-actions github-actions bot added the MR label Feb 8, 2021
Copy link
Collaborator

@marton-bod marton-bod left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for the fixes and working on this @pvary!

@pvary
Copy link
Contributor Author

pvary commented Feb 10, 2021

@rdblue: Could you please review this if you have time?
Thanks,
Peter

@pvary
Copy link
Contributor Author

pvary commented Feb 16, 2021

@rdblue: Are you still available? If so could you please review?

Thanks,
Peter

.throwFailureWhenFinished()
.stopOnFailure()
.executeWith(tableExecutor)
.run(entry -> commitTable(fileExecutor, jobContext, entry.getKey(), entry.getValue()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this break atomicity? I guess there really isn't a good fix until multi-table transactions but I think @aokolnychyi has a small workaround for this sort of situation but I do not remember how it works.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah would be great to integrate with #1849, but we can also have this out first and just document the lack of atomicity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sadly with the current Hive API we can not have atomicity.

Currently here is how Hive works:

  1. Create a staging directory for the query inside the table root directories
  2. Create files inside these staging directories, and write data there - Mappers/Reducers
  3. With a MoveTask move the contents of the staging directory to the final place

SerDe provides a way to change the step 2., but we do not have access to the other steps.

On HDFS this works fairly well, since the hdfs move is fast / atomic (still no guarantees for multi table inserts), but on S3 it is not "optimal". Hive solves this for the ACID tables by making the changes available for reads when the transaction is committed. When #1849 is available we have to do some kind of 2 way commit, so the Hive ACID transaction is committed together with the Iceberg transaction. Not yet sure how it will look like. Maybe shared locking? Maybe adding two way commit to the API? We will find out when we get there.

Until the problem is solved correctly this could be a somewhat limping solution for these queries to execute offering the similar guarantees than the current multitable inserts above S3 using Hive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And big thanks for the reviews @RussellSpitzer and @jackye1995!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need to document that Hive multi-table inserts are not atomic.

public static final String COMMIT_THREAD_POOL_SIZE = "iceberg.mr.commit.thread.pool.size";
public static final int COMMIT_THREAD_POOL_SIZE_DEFAULT = 10;
public static final String OUTPUT_TABLES = "iceberg.mr.output.tables";
public static final String COMMIT_TABLE_THREAD_POOL_SIZE = "iceberg.mr.commit.table.thread.pool.size";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use the same thread pool? And why use a thread pool at all? This could be done serially.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can be convinced to not do it parallel (my first patch contained no parallel execution). It really depends on the length of the Iceberg commit commit. I have seen generated BI queries with more than 10 target tables, but usually we will have only a single table as a target.

If we decide to keep the option to run the Iceberg commits in parallel, I would like to keep it as a separate pool with different sizing. So if the iceberg.mr.commit.table.thread.pool.size is set to <=1, or we have only a single target table then we will run the commit in the same thread and do not create an executor pool for the Iceberg commits.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would keep it simple until we have a need for parallelism. I expect the commits to be much faster than the actual execution, so I don't think that it will be a very significant delay if we do them serially. Probably better to avoid complexity until we need it later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, I don't think there are any blockers. I'd prefer to avoid the complexity of more thread pools and configuration constants, but it should be up to people that use Hive more than I do.

I would prefer to keep the parallel commits because I think the complexity is around the same thanks to the Tasks API. Other than creating the executors it is just the matter of using Tasks instead of loops. On the other hand since we do not have atomicity in commits we are better of committing as fast as possible to prevent concurrent changes.

I had to rebase and during the rebase I have realized that I have used the same FileIO to access the temp directory for every table. In the unit tests it was not an issue since all of the tables were using the same io, but I had to fix that.

Thanks,
Peter

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, whatever you prefer. Merge when you're ready!

Copy link
Contributor

@rdblue rdblue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, I don't think there are any blockers. I'd prefer to avoid the complexity of more thread pools and configuration constants, but it should be up to people that use Hive more than I do.

Properties catalogProperties = new Properties();
catalogProperties.put(Catalogs.NAME, name);
catalogProperties.put(Catalogs.LOCATION, location);
Table table = Catalogs.loadTable(conf, catalogProperties);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this loadTable? Can't we pass in and use the table that we deserialized from the config in commitJob?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Hive we expect that inserts do not conflict with any other modification of the table.
The deserialized table is read-only, and we need to read a new version of the table, so we need this loadTable

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Just thinking out loud but I'm wondering whether we can remove the table deserialization step if we have to load the table here anyway. For example if we also cached the table locations with a similar config prefix, and had a method HiveIcebergStorageHandler.location(jobConf, output), then could we get rid of the deserialization step?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need the table and the location for loading the table. So if we store the location in the config somehow then we can get rid of the full deserialization of the table.

OTOH #2362 would help amortizing the cost of deserializing the table if we only need the location.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let's park this until #2362 gets in. We might not need the extra complexity if we don't gain enough on the performance front.

@pvary pvary merged commit d151034 into apache:master Mar 30, 2021
@pvary
Copy link
Contributor Author

pvary commented Mar 30, 2021

Merged to master.
Thanks for the review @rdblue, @marton-bod, @jackye1995, @RussellSpitzer!

coolderli pushed a commit to coolderli/iceberg that referenced this pull request Apr 26, 2021
stevenzwu pushed a commit to stevenzwu/iceberg that referenced this pull request Jul 28, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants