Skip to content

support adding extra commit metadata with SQL in Spark#4956

Merged
rdblue merged 18 commits intoapache:masterfrom
CodingCat:metadata_thread
Jun 6, 2022
Merged

support adding extra commit metadata with SQL in Spark#4956
rdblue merged 18 commits intoapache:masterfrom
CodingCat:metadata_thread

Conversation

@CodingCat
Copy link
Contributor

this PR implements the functionality for users to add extra commit metadata when operating tables with SQL. It also allows users to use multi threading to commit data to tables while keeping metadata thread local

new usage:

(0 until 10).foreach { _ =>


    new Thread() {
      override def run() {
        CommitMetadata.withCommitProperties(Map("metadata-key" -> "thread-local-metadata-value").asJava,
          () => {
            SparkSession.getActiveSession.get.sql("INSERT INTO target VALUES (3, 'c'), (4, 'd')");
          })
      }
    }.start()

  }
}

@CodingCat
Copy link
Contributor Author

Hi, @kbendick @rdblue I just made this PR as a followup of #4795, please help to review and thanks in advance! once we agree on the approach here, I will add changes to other versions of Spark

String tableLocation = temp.newFolder("iceberg-table").toString();
HadoopTables tables = new HadoopTables(CONF);
int threadsCount = 3;
ExecutorService executorService = Executors.newFixedThreadPool(threadsCount, new ThreadFactory() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use the helper in ThreadPools and also wrap it in a try/finally to close the threadpool?

I'm also not entirely sure this requires a threadpool to test. I think it would be fine to test a single write in the current thread.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also prefer a test that uses a single write in the current thread without any additional threading business. I worry that CommitMetadata doesn't seem thread local to users.

And then if a multi-threaded test is needed, using the helpers from ThreadPools as suggested.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated to ThreadPool, I think multi-threading testing is still necessary? as we need to have something guarding that the commit metadata change is thread safe no matter we use ThreadLocal as now or later we change to something else for any reason

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think multi-threaded testing is needed. It's enough to know that we're using a thread-local. This also is not guaranteed to run the way this test assumes that it will. There is not a guarantee that the thread pool will scale all the way up, and there's no guarantee that the tasks will each run in a separate thread. I think it's likely that those will happen, but this could still be a source of flakiness later on. Also, this doesn't necessarily test that the thread-local is working properly because there's no guarantee of concurrency across tasks.

While it's probably working the way you expect, there's no guarantee that it must. So I'd prefer to keep the test simple.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

/**
* utility class to accept thread local commit properties
*/
public class CommitMetadata {
Copy link
Contributor

@kbendick kbendick Jun 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this class need to be public? Could it be made package-private?

I have concerns around the usage of ThreadLocal for things that most cases don't need to be thread local. I don't want to give users too much room to hurt themselves because they don't consider that CommitMetadata is only threadlocal and then their writes not working properly in the common case of writes without user-side multithreading (e.g. it gets set in one thread somewhere, but another thread is used for commit).

EDIT - Since this takes a Callable, it's less of a concern. I would still name it in a way that's a bit more reflective of the thread local nature (especially if we wanted a CommitMetadata class one day that doens't require a callable and is persistent). That and I always prefer things be package-private if possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, changed to CallerWithCommitMetadata....but...eh...not sure if it is better or worse....

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this class need to be public? Could it be made package-private?

Yes, this does need to be public because it is a way for Iceberg users to pass metadata.

Copy link
Contributor

@kbendick kbendick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking this over more, there's really no way to achieve this use-case without this. So outside of the style nits Ryan mentioned, this looks pretty much good to me.

Thanks @CodingCat!

@CodingCat
Copy link
Contributor Author

thanks! @rdblue and @kbendick , just updated the PR

@CodingCat
Copy link
Contributor Author

just to confirm, seems we don't support SQL based table insert/merge in Spark 2.4 at all right? (so this functionality is not relevant there)

Copy link
Contributor

@rdblue rdblue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was wrong with the CommitMetadata name? I think we should have a simpler name than CallerWithCommitMetadata. That's a bit too confusing. I think the original name was good.

@CodingCat
Copy link
Contributor Author

What was wrong with the CommitMetadata name? I think we should have a simpler name than CallerWithCommitMetadata. That's a bit too confusing. I think the original name was good.

#4956 (comment) @kbendick mentioned some potential conflict with SS's CommitMetadata here, I don't have strong opinion on this

return callable.call();
} catch (Throwable e) {
ExceptionUtil.castAndThrow(e, exClass);
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] is this required as we throw in the line above ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the compiler sees that castAndThrow will always throw, so it needs this to know what to do.

Comment on lines 178 to 180
if (!CallerWithCommitMetadata.commitProperties().isEmpty()) {
CallerWithCommitMetadata.commitProperties().forEach(operation::set);
}
Copy link
Contributor

@singhpk234 singhpk234 Jun 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[doubt](Probably not in scope of PR) Should we add a validation that the keys passed from here as well as extraSnapshotMetadata doesn't override the keys already present in the snasphot-summary ? Or this functionality is intended to do

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that we need to worry about this. It is unlikely to conflict and if it does conflict, it's up to the caller to decide what to do.

Comment on lines 196 to 198
if (!CallerWithCommitMetadata.commitProperties().isEmpty()) {
CallerWithCommitMetadata.commitProperties().forEach(operation::set);
}
Copy link
Contributor

@singhpk234 singhpk234 Jun 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[question] Should we also add this to SparkPositionDeltaWrite

since starting 3.2 iceberg support MOR with pos deletes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's a good idea.

@rdblue
Copy link
Contributor

rdblue commented Jun 6, 2022

@CodingCat, I think renaming the class back is about the only thing left to fix.

@kbendick
Copy link
Contributor

kbendick commented Jun 6, 2022

What was wrong with the CommitMetadata name? I think we should have a simpler name than CallerWithCommitMetadata. That's a bit too confusing. I think the original name was good.

#4956 (comment) @kbendick mentioned some potential conflict with SS's CommitMetadata here, I don't have strong opinion on this

I’m good with the original name too (and prefer it to the new one).

@CodingCat
Copy link
Contributor Author

@kbendick @rdblue @singhpk234 updated the PR accordingly, thanks!

@rdblue rdblue merged commit 0adf678 into apache:master Jun 6, 2022
@rdblue
Copy link
Contributor

rdblue commented Jun 6, 2022

Thanks, @CodingCat!

namrathamyske pushed a commit to namrathamyske/iceberg that referenced this pull request Jul 10, 2022
apache#4956)

This is needed because Spark cannot pass additional metadata for some operations.
namrathamyske pushed a commit to namrathamyske/iceberg that referenced this pull request Jul 10, 2022
apache#4956)

This is needed because Spark cannot pass additional metadata for some operations.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants