-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: Ignore the Forbidden when creating a database #7795
Conversation
It can be that the user that runs a Flink job, doesn't have the privileges to create a database. In that case we just assume that it already exists.
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
Outdated
Show resolved
Hide resolved
03432eb
to
4269e50
Compare
Looks reasonable to me. |
if (!databaseExists(defaultDatabase)) { | ||
try { | ||
createDatabase(getDefaultDatabase(), ImmutableMap.of(), true); | ||
} catch (DatabaseAlreadyExistException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is also a little weird since we already did the dataExists
check earlier. I understand why this is needed to handle race condition when multiple jobs can run this code path concurrently.
The first attempt of ignoring ForbiddenException
was still not ideal to me because it can mask real issue that default database wasn't created due to permission issue.
I checked the HiveCatalog
and GenericInMemoryCatalog
implementation from Flink code. They don't try to create the default database in the open
method. Flink HiveCatalog
just assert that default database exists. That behavior seems reasonable to me.
cc @pvary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you suggesting removing the create database call? The person that initially reported this issue, was very supprised that this call was in there. So I think that removing the call seems reasonable to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I was thinking about remove the create database call and just add a Preconditions
check that the default database exists. that is also consistent with Flink's HiveCatalog
implementation.
The Preconditions
check is also debatable. I am not very clear about the expectation of catalog. is the default
database always expected/required? not sure if @rdblue has any input.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd be okay with removing the check, although I think as long as attempting to create it doesn't cause a failure it is probably slightly better to create it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fokko I am still leaning toward removing the create. This is a Flink Catalog
impl (not Iceberg catalog). I think we can stick with Flink HiveCatalog
style, where open
method doesn't create the default database automatically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stevenzwu I agree. I would not expect writing data would also create a database. I'll update the PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated the tests as well. I think this one is good to go @stevenzwu
* Hive: Set commit state as Unknown before throwing CommitStateUnknownException (apache#7931) (apache#8029) * Spark 3.4: WAP branch not propagated when using DELETE without WHERE (apache#7900) (apache#8028) * Core: Include all reachable snapshots with v1 format and REF snapshot mode (apache#7621) (apache#8027) * Spark 3.3: Backport 'WAP branch not propagated when using DELETE without WHERE' (apache#8033) (apache#8036) * Flink: remove the creation of default database in FlinkCatalog open method (apache#7795) (apache#8039) * Core: Handle optional fields (apache#8050) (apache#8064) * Core: Handle allow optional fields We expect: - current-snapshot-id - properties - snapshots to be there, but they are actually optional. * Use AssertJ * Core: Abort file groups should be under same lock as committerService (apache#7933) (apache#8060) * Spark 3.4: Fix rewrite_position_deletes for certain partition types (apache#8059) * Spark 3.3: Fix rewrite_position_deletes for certain partition types (apache#8059) (apache#8069) * Spark: Add actions for disater recovery. * Fix the compile error. * Fix merge conflicts and formatting * All tests are working and code integrated with Spark 3.3 * Fix union error and snapshots test * Fix Spark broadcast error * Add RewritePositionDeleteFilesSparkAction --------- Co-authored-by: Eduard Tudenhoefner <etudenhoefner@gmail.com> Co-authored-by: Fokko Driesprong <fokko@apache.org> Co-authored-by: Xianyang Liu <liu-xianyang@hotmail.com> Co-authored-by: Szehon Ho <szehon.apache@gmail.com> Co-authored-by: Yufei Gu <yufei_gu@apple.com> Co-authored-by: yufeigu <yufei@apache.org> Co-authored-by: Laith Alzyoud <laith.alzyoud@revolut.com> Co-authored-by: vaultah <4944562+vaultah@users.noreply.github.com>
It can be that the user that runs a Flink job, doesn't have the privileges to create a database. In that case we just assume that it already exists.