-
Notifications
You must be signed in to change notification settings - Fork 459
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: read vacuumed delta log without _last_checkpoint #643
Conversation
@houqp - tests seem to be failing for concurrent writes, and I am not quite sure what might be causing this. do you have any pointer where to start linvestigating? |
rust/src/storage/file/mod.rs
Outdated
let path = String::from(entry.path().to_str().unwrap()); | ||
// We check if the file is a temporary file created to prepare a commit. | ||
// Getting meta data is another system call on some platforms and the file | ||
// may have been removed in the meantime ... | ||
let modified = if path.contains("_commit_") { | ||
chrono::Utc::now() | ||
} else { | ||
DateTime::from(entry.metadata().await.unwrap().modified().unwrap()) | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not really happy with this solution, especially since we cannot do this if we are to move to an external object store crate. I toyed around with using a separate "directory" for preparing the commit files, to not have ephemeral files show up in the list operation at all, but there some errors popped up, as it seems due to renaming into a folder that does not exist yet, when creating a new table.
As is all concurrent tests are passing, so I thought I'd ask other peoples opinions.
FYI - the implementation in object_store_rs is quite different, and this problem might not show up there in that form, since there are now "unwraps" over there...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps a more general logic (that would be acceptable in object_store_rs) would be to skip the entry if grabbing metadata resulted in NotFound
?
rust/src/storage/file/mod.rs
Outdated
// We check if the file is a temporary file created to prepare a commit. | ||
// Getting meta data is another system call on some platforms and the file | ||
// may have been removed in the meantime ... | ||
let modified = if path.contains("_commit_") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch on this race condition!
I think there is still a race here because other files could be deleted through vacuuming as well. What do you think about matching on the metadata error and ignore the path if error matches Os { code: 2, kind: NotFound }
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ha, nvm, @wjones127 beat me to it :D
@houqp, @wjones127 - thanks for the feedback! We are now handling errors in |
})?), | ||
}) | ||
} | ||
Err(err) if err.kind() == io::ErrorKind::NotFound => Err(StorageError::NotFound), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, this means we will error in the case of a temporary commit being moved. Is that what we want (versus skipping that entry)?
I still lean towards ignoring. I think it would be unexpected to ask to list_dir
and get a not found error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just making sure we are talking about the same thing. My intent was (and I hope that's what the code is doing :D ) to return an Err element in the return stream, you would be getting the "successful" stream just one or some of the items would be Err's. I went this way since i believe this is what object store would do.
If we agree, I'd be happy to go through some Option / filter_map dance to filter out the errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, so the elements yielded by the stream is a Result<ObjectMeta>
rather than <ObjectMeta>
. Then you could filter those for non-error elements.
You are right, the current behavior in object_store_rs (at least for list_dir()
) is to return the error for that item in the stream. (list_with_delimiter
errors early though, not sure if we want to fix that.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we sure the list stream consumer is supposed to able to ignore the error while iterating through the stream? if list_with_delimiter is performing early exit, then I think we should keep the behavior consistent for list
as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here i think we want to avoid early exit, since we will be hitting these races in our tests :). Root being that we need to make a separate request on linux systems to get the modification time. object_store_rs uses walkdir - not sure if this is the same there. My "feeling" is that within the arrow world we are more often then not returning streams of results rather then objects directly.
As for list_with_delimiters
- i think this is the "secondary" api. The current object store in datafusion also consistently returns result streams.
If we want early exit though, i think we have to revert to some sort of logic to capture / filter the "expected" errors we see in the tests. Not sure where spark writes its temporary files, but if we were to write them outside of _delta_log
the concurrent commit scenario just goes away, and we "only" have to deal with the vacuum case. Of course all of this only for the local file system on linux, where in practice i assume concurrent commits are much rarer then in cases where the table is backed by a cloud storage.
My personal favourite is the current solution, since a stream of results seems most common through all implementations I have seen so far.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I agree with @roeap here. The existing list_with_delimiters
in object_store_rs
clearly needs to be reworked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the confusion, I should have provided more context in my previous comment :D I was referring to catching the race error within the list api and only return true unrecoverable early exit errors back to the caller. I mostly want to be extra careful on whether we are interpreting the semantics of Result<ObjectMeta>
correctly in object_store_rs so when we switch over, we won't have subtle bugs in the code, e.g. not performing early exit in the caller when we should.
To me the core of the problem is our listing and metadata fetch code is not done in a single atomic system call. For cloud object stores, the metadata comes as part of the list api response, so it is atomic. I haven't had the time to look into walkdir
yet, but if we can get atomicity through that, then we should definitely switch over 👍 The current implementation is not very scalable on large number of files because it is very inefficient to issue one syscall per file. The next best thing is to "simulate" this atomic listing and metadata lookup operation by ignoring not found os error within the list implementation.
That said, if the the Result<ObjectMeta>
in object_store_rs does include errors that can be ignored by the caller, then it would be better to return the not found error back to the caller like how you implemented it right now.
On a high level though, I do think that list
and list_with_delimiters
should have a consistent interface in object_store_rs, but that's a much longer conversation to be carried upstream and surely out of the scope for this change ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was referring to catching the race error within the list api and only return true unrecoverable early exit errors back to the caller.
I think both our current solution and object_store_rs will return NotFound
errors.
To handle this on our side, we could add a check for NotFound
error here:
Lines 699 to 703 in a6ebbaf
let mut stream = self.storage.list_objs(&self.log_uri).await?; | |
while let Some(obj_meta) = stream.next().await { | |
// Exit early if any objects can't be listed. | |
let obj_meta = obj_meta?; |
haven't had the time to look into walkdir yet, but if we can get atomicity through that, then we should definitely switch over
I don't think we can get atomicity through that, unfortunately. It seems to just call fs::metadata
on each entry internally.
The next best thing is to "simulate" this atomic listing and metadata lookup operation by ignoring not found os error within the list implementation.
I'm open to this in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went through all the call sites of list_objs
to see the how we would handle errors in individual elements. also added the error handling suggest by @wjones127 above.
cleanup_expired_logs_for
-> always fail b/c we don't know if we are actually already cleaning the log.get_earliest_delta_log_version
-> we may be vacuuming and the earliest version might have changed. (used only for history)find_latest_check_point_for_version
-> allow not found errors, since checkpoints should never be deleted?vacuum
-> fail since we may already be vacuuming..
We will always fail early if something happens on the list itself. None of this covers a scenario where someone might be messing with the logs through some other means - i.e. not a delta client. But i feel if someone feels they should do that it at their own risk :).
While the handling strategies above might not be affected, I feel like it would be a little more robust outside the _delta_log
path. This would also avoid potential artefacts cluttering the log in case of failed / crashing commits where we may not get to cleanup after failure. If we want to go that route I'd be happy to open a follow up PR.
Generally though this discussion led me to think if we would need to be able to support something like "concurrent vacuum"? To some extend i feel these maintenance operations are outside the normal operations and owners / maintainers of a data asset should be careful enough in their work to not do such things. I.e. we need to mainly concern ourselves with the commit scenarios we are discussing here.
i think in that case if we move the tmp files out of the log dir, we could always fail on any error. scenarios where we wnt to access a listed file file that has since been deleted would need to be retried by the caller anyhow..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think both our current solution and object_store_rs will return NotFound errors.
Yes, I was trying to understand whether this is the intended behavior in object_store_rs. There is also the chance that it was an oversight ;)
get_earliest_delta_log_version -> we may be vacuuming and the earliest version might have changed. (used only for history)
If the earliest version got changed due to compacting, shouldn't we ignore it and return the 2nd earliest version from the list? I think the real annoying part is a lot of the list_objts calls are not even using the metadata but only the path, but we are forced to incur this overhead and race condition everywhere :D
Generally though this discussion led me to think if we would need to be able to support something like "concurrent vacuum"?
i think in that case if we move the tmp files out of the log dir, we could always fail on any error. scenarios where we wnt to access a listed file file that has since been deleted would need to be retried by the caller anyhow..
I think this is a fair assumption. This is actually the main reason why delta has tombstone threshold set to 7 days by default so we have a long enough retention period to avoid such read/vacuum race conditions.
Thanks @roeap and @wjones127 for the great discussion! |
Description
This PR addresses reading the delta log, when some of the log has been removed and no
_last_checkpoint
file is present.The original issue triggering this was related to service principal authentication with azure. After recreating everything locally, I was able to read tables from azure with service principals without problems. From the discussion in #600 and #609 i gathered that this may be related to how we find entry into parsing the log - i.e. relying in either a
_last_checkpoint
file or version 0 being present. Looking at some of our tables created with spark and reading the specs, I think a _last_chekpoint file is not mandatory.With that I also needed to make a change how history is resolved when the limit exceeds available log files. Here I opted for finding the lowest log entry, but not reading any checkpoint to "fill up" to the requested limit. main reason being that I did not want to dive into (if even possible) figuring out the versions from the data in the checkpoint file since they are reconciled.
Related Issue(s)
maybe-closes #600
Documentation