-
Notifications
You must be signed in to change notification settings - Fork 427
Unity Catalog Export: Reduce likelihood for action failure, due to race condition against databricks on table re-registration #9937
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
arielshaqed
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately I do not believe this resolves the race. Specifically, if multiple concurrent Lua hooks try to create-or-replace, then it is possible for one to succeed, one to fail, and no table exist. It might work if you have more retries than concurrent hooks running... but that of course is not a bounded number (and the higher the load, the more concurrent hooks we expect).
| Thread A | Thread B |
|---|---|
| CREATE TABLE T | - |
| table exists | - |
| - | CREATE TABLE T |
| - | table exists |
| dropIfExists T | dropIfExists T |
| CREATE TABLE T | CREATE TABLE T |
| succeeds! | table exists |
| - | dropIfExists T |
| - | table T no longer exists |
| - | CREATE TABLE T |
| - | some random error |
Result: A succeeded, B failed, table does not exist. This is not a possible result for two concurrent CREATE OR REPLACE TABLE T calls.
pkg/actions/lua/databricks/client.go
Outdated
| bo.MaxInterval = 3 * time.Second | ||
| bo.MaxElapsedTime = 10 * time.Second | ||
|
|
||
| deleteAndCreate := func() (string, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
createOrDelete?
|
If there is no direct programmatic access to the |
|
@arielshaqed This indeed does not solve the race between two running actions (though I do think it can decrease the amount of errors caused by such race). This tries to resolve timing issues where Databricks delete operations on large tables may not complete immediately, causing the retry to still see the table as existing. |
|
|
Thank you @arielshaqed, Using "For Delta tables, the table inherits its configuration from the LOCATION if data already exists at that path. As a result, any specified TBLPROPERTIES, table_specification, or PARTITIONED BY clauses must exactly match the existing data at the Delta location." see docs |
pkg/actions/lua/databricks/client.go
Outdated
| if err != nil { | ||
| return err | ||
| } | ||
| return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see dead code
pkg/actions/lua/databricks/client.go
Outdated
| func (client *Client) deleteTableIfExists(catalogName, schemaName, tableName string) error { | ||
| err := client.deleteTable(catalogName, schemaName, tableName) | ||
| if errors.Is(err, databricks.ErrResourceDoesNotExist) { | ||
| return nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't see reason for this function as it just ignore 'not exists' while we add a check if table exists in this flow.
If we check existance and we delete the table and ignore the result, we will fail in the creation - so I don't see a reason to ignore the error.
pkg/actions/lua/databricks/client.go
Outdated
| func (client *Client) checkTableExists(catalogName, schemaName, tableName string) bool { | ||
| table, err := client.workspaceClient.Tables.GetByFullName(client.ctx, tableFullName(catalogName, schemaName, tableName)) | ||
| if err == nil && table != nil { | ||
| return true | ||
| } | ||
| return false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- we ignore the error here
- when you return a boolean value you can return the expression result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, indeed looking better this way. still ignoring the err as if there is an unrelated err it will get caught when creating a table, is that ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no we should not ignore the error as it will not help us understand if something is not working as expected.
in this case we are checking that the table exists and delete the table - if we fail we need to know why, no matter the reason.
pkg/actions/lua/databricks/client.go
Outdated
| panic("unreachable") | ||
| } | ||
| } else { | ||
| // try and delete the table first if exists, to prevent "table already exists" errors in databricks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without "try". Delete the table if exists.
pkg/actions/lua/databricks/client.go
Outdated
| return 1 | ||
| } | ||
|
|
||
| func (client *Client) createExternalTableWithBackoff(warehouseID, catalogName, schemaName, tableName, location string, metadata map[string]any) (string, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't need this function to use retry
pkg/actions/lua/databricks/client.go
Outdated
| if deletionErr != nil { | ||
| return "", backoff.Permanent(deletionErr) | ||
| } | ||
| return "", err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we think the system is eventual consist, and we create+delete in a loop it may fail as we can delete the table we just created.
we like to try only the table creation as discussed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, Thank you.
pkg/actions/lua/databricks/client.go
Outdated
| status, err := backoff.RetryWithData(createTableBO, bo) | ||
|
|
||
| if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove whitespace
| status, err := backoff.RetryWithData(createTableBO, bo) | |
| if err != nil { | |
| status, err := backoff.RetryWithData(createTableBO, bo) | |
| if err != nil { |
pkg/actions/lua/databricks/client.go
Outdated
|
|
||
| func (client *Client) checkTableExists(catalogName, schemaName, tableName string) bool { | ||
| _, err := client.workspaceClient.Tables.GetByFullName(client.ctx, tableFullName(catalogName, schemaName, tableName)) | ||
| return err == nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- If there is additional code this function just calls a function, so I'm not sure we need it.
- We ignore errors here, so there is no difference between errors access the service and table not exists
pkg/actions/lua/databricks/client.go
Outdated
| bo := backoff.NewExponentialBackOff() | ||
| bo.InitialInterval = 100 * time.Millisecond | ||
| bo.MaxInterval = 3 * time.Second | ||
| bo.MaxElapsedTime = 10 * time.Second |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to also add the context
pkg/actions/lua/databricks/client.go
Outdated
| status, err := client.createExternalTable(warehouseID, catalogName, schemaName, tableName, location, metadataMap) | ||
| if err == nil { | ||
| return status, nil | ||
| } | ||
| if alreadyExists(err) { | ||
| return "", err | ||
| } | ||
| return "", backoff.Permanent(err) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefer to check error first
| status, err := client.createExternalTable(warehouseID, catalogName, schemaName, tableName, location, metadataMap) | |
| if err == nil { | |
| return status, nil | |
| } | |
| if alreadyExists(err) { | |
| return "", err | |
| } | |
| return "", backoff.Permanent(err) | |
| } | |
| status, err := client.createExternalTable(warehouseID, catalogName, schemaName, tableName, location, metadataMap) | |
| if err != nil { | |
| if alreadyExists(err) { | |
| return "", err | |
| } | |
| return "", backoff.Permanent(err) | |
| } | |
| return status, nil | |
| } |
|
@nopcoder , thanks, now panicing if we get non related err when checkin its existence. Is that ok? |
nopcoder
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor comments; note that you call panic in the code to mark this code as a return as it will not get to that point, the errorf will panic.
pkg/actions/lua/databricks/client.go
Outdated
| } | ||
| } else { | ||
|
|
||
| // delete the table first if exists, to prevent "table already exists" errors in databricks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comment is for the delete request.
here you need to comment that this call is done to check if table exists.
| _, err := client.workspaceClient.Tables.GetByFullName(client.ctx, tableFullName(catalogName, schemaName, tableName)) | ||
| if err != nil && !errors.Is(err, databricks.ErrResourceDoesNotExist) { | ||
| lua.Errorf(l, "%s", err.Error()) | ||
| panic("unreachable") | ||
| } | ||
| if err == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment on the condition at the end that if no error, table exists and we need to delete it before creation.
note: the previous version had it in a function that just was a call, you can keep the function if you like to include the "is exists" logic, but you need to report the error.
|
Thanks again @nopcoder, kept the table checking without the func and adjusted the comments. |
nopcoder
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- added code style comment
- about the backoff parameters - check the logs to see if it is ok to perform ~6 calls in 2 seconds. this pr is assumption about eventual consist; so I suggest to reproduce first.
- if there are any pending comments from Ariel please review before merge.
pkg/actions/lua/databricks/client.go
Outdated
| var bo backoff.BackOff = backoff.NewExponentialBackOff( | ||
| backoff.WithInitialInterval(100*time.Millisecond), backoff.WithMaxInterval(3*time.Second), backoff.WithMaxElapsedTime(10*time.Second)) | ||
| bo = backoff.WithContext(bo, client.ctx) | ||
|
|
||
| createTableBO := func() (string, error) { | ||
| status, err := client.createExternalTable(warehouseID, catalogName, schemaName, tableName, location, metadataMap) | ||
| if err != nil { | ||
| if alreadyExists(err) { | ||
| return "", err | ||
| } | ||
| return "", backoff.Permanent(err) | ||
| } | ||
| return status, nil | ||
| } | ||
|
|
||
| status, err := backoff.RetryWithData(createTableBO, bo) | ||
| if err != nil { | ||
| lua.Errorf(l, "%s", err.Error()) | ||
| panic("unreachable") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style
| var bo backoff.BackOff = backoff.NewExponentialBackOff( | |
| backoff.WithInitialInterval(100*time.Millisecond), backoff.WithMaxInterval(3*time.Second), backoff.WithMaxElapsedTime(10*time.Second)) | |
| bo = backoff.WithContext(bo, client.ctx) | |
| createTableBO := func() (string, error) { | |
| status, err := client.createExternalTable(warehouseID, catalogName, schemaName, tableName, location, metadataMap) | |
| if err != nil { | |
| if alreadyExists(err) { | |
| return "", err | |
| } | |
| return "", backoff.Permanent(err) | |
| } | |
| return status, nil | |
| } | |
| status, err := backoff.RetryWithData(createTableBO, bo) | |
| if err != nil { | |
| lua.Errorf(l, "%s", err.Error()) | |
| panic("unreachable") | |
| } | |
| bo := backoff.NewExponentialBackOff( | |
| backoff.WithInitialInterval(100*time.Millisecond), | |
| backoff.WithMaxInterval(3*time.Second), | |
| backoff.WithMaxElapsedTime(10*time.Second), | |
| ) | |
| status, err := backoff.RetryWithData(func() (string, error) { | |
| status, err := client.createExternalTable(warehouseID, catalogName, schemaName, tableName, location, metadataMap) | |
| if err != nil { | |
| if alreadyExists(err) { | |
| return "", err | |
| } | |
| return "", backoff.Permanent(err) | |
| } | |
| return status, nil | |
| }, backoff.WithContext(bo, client.ctx)) | |
| if err != nil { | |
| lua.Errorf(l, "%s", err.Error()) | |
| panic("unreachable") | |
| } |
|
This PR does not add support to any new feature, only fixes somewhat broken ones |
arielshaqed
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New code has less risk of a really bad outcome to the race that the previous version on this PR. But it will (almost?) always give the same result as before: the race still exists, the winner will always create the table, the loser will never create the table, it will just take longer for the loser to lose.
If we want to do this, you might perform the change I suggest about deletions and also get rid of the retries when creating the table: they do nothing.
pkg/actions/lua/databricks/client.go
Outdated
| // check whether the the table already exists | ||
| _, err := client.workspaceClient.Tables.GetByFullName(client.ctx, tableFullName(catalogName, schemaName, tableName)) | ||
| if err != nil && !errors.Is(err, databricks.ErrResourceDoesNotExist) && !strings.Contains(err.Error(), "does not exist") { | ||
| lua.Errorf(l, "%s", err.Error()) | ||
| panic("unreachable") | ||
| } | ||
| // in case there is no error, table exists and we will delete it before creating a new one | ||
| if err == nil { | ||
| err = client.deleteTable(catalogName, schemaName, tableName) | ||
| if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You anyway want to delete the table. Checking if it exists first is just another race. Instead, delete the table, keep going if DataBricks say it does not exist.
|
@arielshaqed @nopcoder @Isan-Rivkin Running a simple script to push external table to databricks (from canadacentral) both with the original client and the one in this pr. Getting weird, inconsistent behaviors. An important observation is that no failures occurred when running the test locally, only when running in canada central. for the original client, I did manage to reproduce the original problem: This happens though only it the initial iteration (after first success it continues registering successfully) for the client in this pr I got for example meaning table's existence was checked, then upon deletion, the table was absent (added a check that the error is not It also sometimes failed after a couple of iterations over also failed like this in the first iteration but less times than the original client All in all, results were very inconsistent. failures appeared for both clients. Failures also tend to group over few consecutive runs. the original client did fail more. UPDATE |
|
The test func: |
|
@arielshaqed Using only databricks's api to delete and create external table. Running into the same errors: |
|
Thanks! This latest certainly indicates dropping a table on DataBricks only applies eventually. Unfortunately, having to retry over 100 seconds (and just in a single trial - a bad day will be much worse) makes me wonder how this fix can help! It is one thing to tell users not to perform concurrent commits and/or merges. But post-commit hooks (where we can perform this action) are asynchronous. Now we will need users to space their commits out 2 minutes or more apart from each other. |
|
Removed myself as a reviewer because I keep getting notification reminders 🙈 |
Closes https://github.com/treeverse/product/issues/1030
This PR tries to resolve two unwanted behaviors;
For each export to an existing table, two
create tablecalls were made (first fails as the table exists) resulting in excessive unwanted error logs in databricks.We try to overcome this be checking whether the table exists first and remove it if so.
The second issue is inconsistency in case of re-registration of a table - after successfully deleting a table and trying to create a new one, databricks returns an answer that the table already exists.
lakeFS action logs show
Error: action failed: runtime error: [string "lua"]:60: external table "intervaldata" creation failed: FAILED: BAD_REQUEST [TABLE_OR_VIEW_ALREADY_EXISTS] Cannot create table or view `prod_datahub`.`ami`.`intervaldata` because it already exists.which means the client had to fail here after already deleting the table.