-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Hive: Implement multi-table inserts #2228
Conversation
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for the fixes and working on this @pvary!
@rdblue: Could you please review this if you have time? |
@rdblue: Are you still available? If so could you please review? Thanks, |
.throwFailureWhenFinished() | ||
.stopOnFailure() | ||
.executeWith(tableExecutor) | ||
.run(entry -> commitTable(fileExecutor, jobContext, entry.getKey(), entry.getValue())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this break atomicity? I guess there really isn't a good fix until multi-table transactions but I think @aokolnychyi has a small workaround for this sort of situation but I do not remember how it works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah would be great to integrate with #1849, but we can also have this out first and just document the lack of atomicity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sadly with the current Hive API we can not have atomicity.
Currently here is how Hive works:
- Create a staging directory for the query inside the table root directories
- Create files inside these staging directories, and write data there - Mappers/Reducers
- With a MoveTask move the contents of the staging directory to the final place
SerDe provides a way to change the step 2., but we do not have access to the other steps.
On HDFS this works fairly well, since the hdfs move is fast / atomic (still no guarantees for multi table inserts), but on S3 it is not "optimal". Hive solves this for the ACID tables by making the changes available for reads when the transaction is committed. When #1849 is available we have to do some kind of 2 way commit, so the Hive ACID transaction is committed together with the Iceberg transaction. Not yet sure how it will look like. Maybe shared locking? Maybe adding two way commit to the API? We will find out when we get there.
Until the problem is solved correctly this could be a somewhat limping solution for these queries to execute offering the similar guarantees than the current multitable inserts above S3 using Hive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And big thanks for the reviews @RussellSpitzer and @jackye1995!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will need to document that Hive multi-table inserts are not atomic.
public static final String COMMIT_THREAD_POOL_SIZE = "iceberg.mr.commit.thread.pool.size"; | ||
public static final int COMMIT_THREAD_POOL_SIZE_DEFAULT = 10; | ||
public static final String OUTPUT_TABLES = "iceberg.mr.output.tables"; | ||
public static final String COMMIT_TABLE_THREAD_POOL_SIZE = "iceberg.mr.commit.table.thread.pool.size"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use the same thread pool? And why use a thread pool at all? This could be done serially.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can be convinced to not do it parallel (my first patch contained no parallel execution). It really depends on the length of the Iceberg commit commit. I have seen generated BI queries with more than 10 target tables, but usually we will have only a single table as a target.
If we decide to keep the option to run the Iceberg commits in parallel, I would like to keep it as a separate pool with different sizing. So if the iceberg.mr.commit.table.thread.pool.size
is set to <=1
, or we have only a single target table then we will run the commit in the same thread and do not create an executor pool for the Iceberg commits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would keep it simple until we have a need for parallelism. I expect the commits to be much faster than the actual execution, so I don't think that it will be a very significant delay if we do them serially. Probably better to avoid complexity until we need it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, I don't think there are any blockers. I'd prefer to avoid the complexity of more thread pools and configuration constants, but it should be up to people that use Hive more than I do.
I would prefer to keep the parallel commits because I think the complexity is around the same thanks to the Tasks
API. Other than creating the executors it is just the matter of using Tasks
instead of loops. On the other hand since we do not have atomicity in commits we are better of committing as fast as possible to prevent concurrent changes.
I had to rebase and during the rebase I have realized that I have used the same FileIO
to access the temp directory for every table. In the unit tests it was not an issue since all of the tables were using the same io
, but I had to fix that.
Thanks,
Peter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, whatever you prefer. Merge when you're ready!
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
Outdated
Show resolved
Hide resolved
mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, I don't think there are any blockers. I'd prefer to avoid the complexity of more thread pools and configuration constants, but it should be up to people that use Hive more than I do.
Properties catalogProperties = new Properties(); | ||
catalogProperties.put(Catalogs.NAME, name); | ||
catalogProperties.put(Catalogs.LOCATION, location); | ||
Table table = Catalogs.loadTable(conf, catalogProperties); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need this loadTable? Can't we pass in and use the table that we deserialized from the config in commitJob
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Hive we expect that inserts do not conflict with any other modification of the table.
The deserialized table is read-only, and we need to read a new version of the table, so we need this loadTable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Just thinking out loud but I'm wondering whether we can remove the table deserialization step if we have to load the table here anyway. For example if we also cached the table locations with a similar config prefix, and had a method HiveIcebergStorageHandler.location(jobConf, output)
, then could we get rid of the deserialization step?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need the table
and the location
for loading the table. So if we store the location in the config somehow then we can get rid of the full deserialization of the table.
OTOH #2362 would help amortizing the cost of deserializing the table if we only need the location.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, let's park this until #2362 gets in. We might not need the extra complexity if we don't gain enough on the performance front.
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
Show resolved
Hide resolved
Merged to master. |
When the insert statement adds data to multiple tables the current OutputCommitter implementation fails to handle the situation.
Example query:
The change contains the following modifications:
TaskAttempt
.forCommit
file for every target table