Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: read vacuumed delta log without _last_checkpoint #643

Merged
merged 9 commits into from
Jun 21, 2022

Conversation

roeap
Copy link
Collaborator

@roeap roeap commented Jun 15, 2022

Description

This PR addresses reading the delta log, when some of the log has been removed and no _last_checkpoint file is present.

The original issue triggering this was related to service principal authentication with azure. After recreating everything locally, I was able to read tables from azure with service principals without problems. From the discussion in #600 and #609 i gathered that this may be related to how we find entry into parsing the log - i.e. relying in either a _last_checkpoint file or version 0 being present. Looking at some of our tables created with spark and reading the specs, I think a _last_chekpoint file is not mandatory.

With that I also needed to make a change how history is resolved when the limit exceeds available log files. Here I opted for finding the lowest log entry, but not reading any checkpoint to "fill up" to the requested limit. main reason being that I did not want to dive into (if even possible) figuring out the versions from the data in the checkpoint file since they are reconciled.

Related Issue(s)

maybe-closes #600

Documentation

@roeap
Copy link
Collaborator Author

roeap commented Jun 15, 2022

@houqp - tests seem to be failing for concurrent writes, and I am not quite sure what might be causing this. do you have any pointer where to start linvestigating?

@roeap roeap force-pushed the incomplete-logs branch from 00430f5 to bbbb9e6 Compare June 18, 2022 20:21
@roeap roeap marked this pull request as draft June 18, 2022 20:21
Comment on lines 95 to 103
let path = String::from(entry.path().to_str().unwrap());
// We check if the file is a temporary file created to prepare a commit.
// Getting meta data is another system call on some platforms and the file
// may have been removed in the meantime ...
let modified = if path.contains("_commit_") {
chrono::Utc::now()
} else {
DateTime::from(entry.metadata().await.unwrap().modified().unwrap())
};
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really happy with this solution, especially since we cannot do this if we are to move to an external object store crate. I toyed around with using a separate "directory" for preparing the commit files, to not have ephemeral files show up in the list operation at all, but there some errors popped up, as it seems due to renaming into a folder that does not exist yet, when creating a new table.

As is all concurrent tests are passing, so I thought I'd ask other peoples opinions.

FYI - the implementation in object_store_rs is quite different, and this problem might not show up there in that form, since there are now "unwraps" over there...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps a more general logic (that would be acceptable in object_store_rs) would be to skip the entry if grabbing metadata resulted in NotFound?

@roeap roeap marked this pull request as ready for review June 18, 2022 22:08
@roeap roeap requested review from wjones127, houqp and fvaleye June 18, 2022 22:08
@roeap roeap changed the title read vacuumed delta log without _last_checkpoint fix: read vacuumed delta log without _last_checkpoint Jun 18, 2022
// We check if the file is a temporary file created to prepare a commit.
// Getting meta data is another system call on some platforms and the file
// may have been removed in the meantime ...
let modified = if path.contains("_commit_") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch on this race condition!

I think there is still a race here because other files could be deleted through vacuuming as well. What do you think about matching on the metadata error and ignore the path if error matches Os { code: 2, kind: NotFound }?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ha, nvm, @wjones127 beat me to it :D

@roeap roeap force-pushed the incomplete-logs branch from 98c7fc3 to 9d0d79f Compare June 19, 2022 05:14
@roeap
Copy link
Collaborator Author

roeap commented Jun 19, 2022

@houqp, @wjones127 - thanks for the feedback! We are now handling errors in list_objs, and consumers seem to handle that well 😄.

})?),
})
}
Err(err) if err.kind() == io::ErrorKind::NotFound => Err(StorageError::NotFound),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, this means we will error in the case of a temporary commit being moved. Is that what we want (versus skipping that entry)?

I still lean towards ignoring. I think it would be unexpected to ask to list_dir and get a not found error.

Copy link
Collaborator Author

@roeap roeap Jun 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just making sure we are talking about the same thing. My intent was (and I hope that's what the code is doing :D ) to return an Err element in the return stream, you would be getting the "successful" stream just one or some of the items would be Err's. I went this way since i believe this is what object store would do.

If we agree, I'd be happy to go through some Option / filter_map dance to filter out the errors.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, so the elements yielded by the stream is a Result<ObjectMeta> rather than <ObjectMeta>. Then you could filter those for non-error elements.

You are right, the current behavior in object_store_rs (at least for list_dir()) is to return the error for that item in the stream. (list_with_delimiter errors early though, not sure if we want to fix that.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure the list stream consumer is supposed to able to ignore the error while iterating through the stream? if list_with_delimiter is performing early exit, then I think we should keep the behavior consistent for list as well.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here i think we want to avoid early exit, since we will be hitting these races in our tests :). Root being that we need to make a separate request on linux systems to get the modification time. object_store_rs uses walkdir - not sure if this is the same there. My "feeling" is that within the arrow world we are more often then not returning streams of results rather then objects directly.

As for list_with_delimiters - i think this is the "secondary" api. The current object store in datafusion also consistently returns result streams.

If we want early exit though, i think we have to revert to some sort of logic to capture / filter the "expected" errors we see in the tests. Not sure where spark writes its temporary files, but if we were to write them outside of _delta_log the concurrent commit scenario just goes away, and we "only" have to deal with the vacuum case. Of course all of this only for the local file system on linux, where in practice i assume concurrent commits are much rarer then in cases where the table is backed by a cloud storage.

My personal favourite is the current solution, since a stream of results seems most common through all implementations I have seen so far.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree with @roeap here. The existing list_with_delimiters in object_store_rs clearly needs to be reworked.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the confusion, I should have provided more context in my previous comment :D I was referring to catching the race error within the list api and only return true unrecoverable early exit errors back to the caller. I mostly want to be extra careful on whether we are interpreting the semantics of Result<ObjectMeta> correctly in object_store_rs so when we switch over, we won't have subtle bugs in the code, e.g. not performing early exit in the caller when we should.

To me the core of the problem is our listing and metadata fetch code is not done in a single atomic system call. For cloud object stores, the metadata comes as part of the list api response, so it is atomic. I haven't had the time to look into walkdir yet, but if we can get atomicity through that, then we should definitely switch over 👍 The current implementation is not very scalable on large number of files because it is very inefficient to issue one syscall per file. The next best thing is to "simulate" this atomic listing and metadata lookup operation by ignoring not found os error within the list implementation.

That said, if the the Result<ObjectMeta> in object_store_rs does include errors that can be ignored by the caller, then it would be better to return the not found error back to the caller like how you implemented it right now.

On a high level though, I do think that list and list_with_delimiters should have a consistent interface in object_store_rs, but that's a much longer conversation to be carried upstream and surely out of the scope for this change ;)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was referring to catching the race error within the list api and only return true unrecoverable early exit errors back to the caller.

I think both our current solution and object_store_rs will return NotFound errors.

To handle this on our side, we could add a check for NotFound error here:

delta-rs/rust/src/delta.rs

Lines 699 to 703 in a6ebbaf

let mut stream = self.storage.list_objs(&self.log_uri).await?;
while let Some(obj_meta) = stream.next().await {
// Exit early if any objects can't be listed.
let obj_meta = obj_meta?;

haven't had the time to look into walkdir yet, but if we can get atomicity through that, then we should definitely switch over

I don't think we can get atomicity through that, unfortunately. It seems to just call fs::metadata on each entry internally.

The next best thing is to "simulate" this atomic listing and metadata lookup operation by ignoring not found os error within the list implementation.

I'm open to this in the future.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went through all the call sites of list_objs to see the how we would handle errors in individual elements. also added the error handling suggest by @wjones127 above.

  1. cleanup_expired_logs_for -> always fail b/c we don't know if we are actually already cleaning the log.
  2. get_earliest_delta_log_version -> we may be vacuuming and the earliest version might have changed. (used only for history)
  3. find_latest_check_point_for_version -> allow not found errors, since checkpoints should never be deleted?
  4. vacuum -> fail since we may already be vacuuming..

We will always fail early if something happens on the list itself. None of this covers a scenario where someone might be messing with the logs through some other means - i.e. not a delta client. But i feel if someone feels they should do that it at their own risk :).

While the handling strategies above might not be affected, I feel like it would be a little more robust outside the _delta_log path. This would also avoid potential artefacts cluttering the log in case of failed / crashing commits where we may not get to cleanup after failure. If we want to go that route I'd be happy to open a follow up PR.

Generally though this discussion led me to think if we would need to be able to support something like "concurrent vacuum"? To some extend i feel these maintenance operations are outside the normal operations and owners / maintainers of a data asset should be careful enough in their work to not do such things. I.e. we need to mainly concern ourselves with the commit scenarios we are discussing here.

i think in that case if we move the tmp files out of the log dir, we could always fail on any error. scenarios where we wnt to access a listed file file that has since been deleted would need to be retried by the caller anyhow..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think both our current solution and object_store_rs will return NotFound errors.

Yes, I was trying to understand whether this is the intended behavior in object_store_rs. There is also the chance that it was an oversight ;)

get_earliest_delta_log_version -> we may be vacuuming and the earliest version might have changed. (used only for history)

If the earliest version got changed due to compacting, shouldn't we ignore it and return the 2nd earliest version from the list? I think the real annoying part is a lot of the list_objts calls are not even using the metadata but only the path, but we are forced to incur this overhead and race condition everywhere :D

Generally though this discussion led me to think if we would need to be able to support something like "concurrent vacuum"?

i think in that case if we move the tmp files out of the log dir, we could always fail on any error. scenarios where we wnt to access a listed file file that has since been deleted would need to be retried by the caller anyhow..

I think this is a fair assumption. This is actually the main reason why delta has tombstone threshold set to 7 days by default so we have a long enough retention period to avoid such read/vacuum race conditions.

wjones127
wjones127 previously approved these changes Jun 19, 2022
@roeap roeap dismissed stale reviews from wjones127 and ghost via dca80d3 June 20, 2022 22:00
@houqp
Copy link
Member

houqp commented Jun 21, 2022

Thanks @roeap and @wjones127 for the great discussion!

@roeap roeap merged commit 422fbef into delta-io:main Jun 21, 2022
@roeap roeap deleted the incomplete-logs branch June 21, 2022 05:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Not able to access Azure Delta Lake
3 participants