support adding extra commit metadata with SQL in Spark#4956
support adding extra commit metadata with SQL in Spark#4956rdblue merged 18 commits intoapache:masterfrom
Conversation
| String tableLocation = temp.newFolder("iceberg-table").toString(); | ||
| HadoopTables tables = new HadoopTables(CONF); | ||
| int threadsCount = 3; | ||
| ExecutorService executorService = Executors.newFixedThreadPool(threadsCount, new ThreadFactory() { |
There was a problem hiding this comment.
Can you use the helper in ThreadPools and also wrap it in a try/finally to close the threadpool?
I'm also not entirely sure this requires a threadpool to test. I think it would be fine to test a single write in the current thread.
There was a problem hiding this comment.
I would also prefer a test that uses a single write in the current thread without any additional threading business. I worry that CommitMetadata doesn't seem thread local to users.
And then if a multi-threaded test is needed, using the helpers from ThreadPools as suggested.
There was a problem hiding this comment.
updated to ThreadPool, I think multi-threading testing is still necessary? as we need to have something guarding that the commit metadata change is thread safe no matter we use ThreadLocal as now or later we change to something else for any reason
There was a problem hiding this comment.
I don't think multi-threaded testing is needed. It's enough to know that we're using a thread-local. This also is not guaranteed to run the way this test assumes that it will. There is not a guarantee that the thread pool will scale all the way up, and there's no guarantee that the tasks will each run in a separate thread. I think it's likely that those will happen, but this could still be a source of flakiness later on. Also, this doesn't necessarily test that the thread-local is working properly because there's no guarantee of concurrency across tasks.
While it's probably working the way you expect, there's no guarantee that it must. So I'd prefer to keep the test simple.
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
Outdated
Show resolved
Hide resolved
| /** | ||
| * utility class to accept thread local commit properties | ||
| */ | ||
| public class CommitMetadata { |
There was a problem hiding this comment.
Does this class need to be public? Could it be made package-private?
I have concerns around the usage of ThreadLocal for things that most cases don't need to be thread local. I don't want to give users too much room to hurt themselves because they don't consider that CommitMetadata is only threadlocal and then their writes not working properly in the common case of writes without user-side multithreading (e.g. it gets set in one thread somewhere, but another thread is used for commit).
EDIT - Since this takes a Callable, it's less of a concern. I would still name it in a way that's a bit more reflective of the thread local nature (especially if we wanted a CommitMetadata class one day that doens't require a callable and is persistent). That and I always prefer things be package-private if possible.
There was a problem hiding this comment.
sure, changed to CallerWithCommitMetadata....but...eh...not sure if it is better or worse....
There was a problem hiding this comment.
Does this class need to be public? Could it be made package-private?
Yes, this does need to be public because it is a way for Iceberg users to pass metadata.
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java
Outdated
Show resolved
Hide resolved
kbendick
left a comment
There was a problem hiding this comment.
Thinking this over more, there's really no way to achieve this use-case without this. So outside of the style nits Ryan mentioned, this looks pretty much good to me.
Thanks @CodingCat!
|
just to confirm, seems we don't support SQL based table insert/merge in Spark 2.4 at all right? (so this functionality is not relevant there) |
rdblue
left a comment
There was a problem hiding this comment.
What was wrong with the CommitMetadata name? I think we should have a simpler name than CallerWithCommitMetadata. That's a bit too confusing. I think the original name was good.
#4956 (comment) @kbendick mentioned some potential conflict with SS's CommitMetadata here, I don't have strong opinion on this |
| return callable.call(); | ||
| } catch (Throwable e) { | ||
| ExceptionUtil.castAndThrow(e, exClass); | ||
| return null; |
There was a problem hiding this comment.
[minor] is this required as we throw in the line above ?
There was a problem hiding this comment.
I don't think the compiler sees that castAndThrow will always throw, so it needs this to know what to do.
| if (!CallerWithCommitMetadata.commitProperties().isEmpty()) { | ||
| CallerWithCommitMetadata.commitProperties().forEach(operation::set); | ||
| } |
There was a problem hiding this comment.
[doubt](Probably not in scope of PR) Should we add a validation that the keys passed from here as well as extraSnapshotMetadata doesn't override the keys already present in the snasphot-summary ? Or this functionality is intended to do
There was a problem hiding this comment.
I don't think that we need to worry about this. It is unlikely to conflict and if it does conflict, it's up to the caller to decide what to do.
| if (!CallerWithCommitMetadata.commitProperties().isEmpty()) { | ||
| CallerWithCommitMetadata.commitProperties().forEach(operation::set); | ||
| } |
There was a problem hiding this comment.
[question] Should we also add this to SparkPositionDeltaWrite
since starting 3.2 iceberg support MOR with pos deletes
|
@CodingCat, I think renaming the class back is about the only thing left to fix. |
I’m good with the original name too (and prefer it to the new one). |
|
@kbendick @rdblue @singhpk234 updated the PR accordingly, thanks! |
|
Thanks, @CodingCat! |
apache#4956) This is needed because Spark cannot pass additional metadata for some operations.
apache#4956) This is needed because Spark cannot pass additional metadata for some operations.
this PR implements the functionality for users to add extra commit metadata when operating tables with SQL. It also allows users to use multi threading to commit data to tables while keeping metadata thread local
new usage: