Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: buffered reading of transaction logs #1549

Merged
merged 32 commits into from
Sep 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
1d70da0
initial buffered log reading
eeroel Jul 20, 2023
dbd0bc4
another attempt
eeroel Jul 20, 2023
cbf4d10
limit reads to max version
eeroel Jul 20, 2023
fa1b641
clean up
eeroel Jul 20, 2023
834161c
attempt to refactor
eeroel Jul 20, 2023
e15e37a
some revisions
eeroel Jul 20, 2023
0c3d28e
handle stream length explicitly
eeroel Jul 20, 2023
e38d83e
fix conversion error handling, small tweaks
eeroel Jul 20, 2023
a402654
remove an unwrap
eeroel Jul 21, 2023
83d6881
initial config, test
eeroel Jul 21, 2023
8b30341
refactor test
eeroel Jul 22, 2023
741f137
expose to python
eeroel Jul 22, 2023
6381c90
clean docs
eeroel Jul 22, 2023
8bade70
expose to python
eeroel Jul 22, 2023
69467ae
end of stream handling, more testing
eeroel Jul 23, 2023
fc8e990
add test, limit for log buffer size
eeroel Jul 23, 2023
96fe14f
list files to get latest version
eeroel Jul 23, 2023
3e97d30
list from current
eeroel Jul 25, 2023
d6da3bf
fix initializer in bench code
eeroel Jul 25, 2023
e7e120d
use listing for get_latest_version
eeroel Jul 25, 2023
da3dbf1
clean up
eeroel Jul 25, 2023
32db9ec
some revisions
eeroel Jul 25, 2023
e22a687
use dynamic data for test
eeroel Jul 26, 2023
8549127
some revisions
eeroel Jul 27, 2023
7f7ee6f
update test file name
eeroel Aug 7, 2023
eb386c0
update python doc
eeroel Sep 5, 2023
fb3c8dd
format
eeroel Sep 5, 2023
987e2a0
satisfy linter
eeroel Sep 5, 2023
e57e020
more formatting
eeroel Sep 5, 2023
5dfdef4
rename builder method
eeroel Sep 5, 2023
41cfafb
yet more formatting
eeroel Sep 5, 2023
59e97cd
Merge branch 'main' into feat/buffered_log_reading
wjones127 Sep 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
end of stream handling, more testing
  • Loading branch information
eeroel committed Sep 8, 2023
commit 69467ae72b921621d9f962b487bf3027d09ef5fa
20 changes: 13 additions & 7 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,15 @@ impl DeltaTable {
self.version(),
);

let max_version = max_version.and_then(|x| {
if x <= self.version() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rtyler I refactored this max_version handling a bit so it's clearer. Here I made explicit the behavior that's on main currently: i.e. the table will be updated to the latest version if max_version is smaller or equal to the current version.

// update to latest version
None
} else {
Some(x)
}
});

let buf_size = self.config.log_buffer_size;
if buf_size == 0 {
return Err(DeltaTableError::Generic(String::from("Log buffer size cannot be zero!")))
Expand All @@ -572,10 +581,7 @@ impl DeltaTable {
log_stream.take(usize::MAX).buffered(buf_size)
}
Some(n) => {
if n - self.version() < 0 {
return Err(DeltaTableError::Generic(String::from("Table version is greater than max_version!")));
}
let n_commits = usize::try_from(n - self.version() + 2);
let n_commits = usize::try_from(n - self.version());
match n_commits {
Ok(n) => log_stream.take(n).buffered(buf_size),
Err(err) => return Err(DeltaTableError::GenericError { source: Box::new(err) })
Expand All @@ -588,9 +594,9 @@ impl DeltaTable {
let next_commit = log_buffer.next().await;
match next_commit {
Some((v, Ok(x))) => Ok(Some((v, self.get_actions(v, x.bytes().await?).await?))),
Some((_, Err(ObjectStoreError::NotFound { .. }))) => Ok(None),
Some((_, Err(err))) => Err(DeltaTableError::GenericError { source: Box::new(err) }), // TODO ??
None => Err(DeltaTableError::Generic(String::from("Log stream closed unexpectedly!")))
Some((_, Err(ObjectStoreError::NotFound { .. }))) => Ok(None), // no more files in the log
Some((_, Err(err))) => Err(DeltaTableError::GenericError { source: Box::new(err) }),
None => Ok(None) // reached end of stream at max_version
}
}?
{
Expand Down
27 changes: 27 additions & 0 deletions rust/tests/read_delta_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,33 @@ async fn test_log_buffering() {
assert_eq!(buf_version, 10);
}

#[tokio::test]
async fn test_log_buffering_success_explicit_version() {
let path = "./tests/data/simple_table_with_no_checkpoint";
let buf_sizes = [1,2,10,100];
for buf_size in buf_sizes {
let mut table = DeltaTableBuilder::from_uri(path).with_version(0).with_buffer(buf_size).load().await.unwrap();
table.update_incremental(None).await.unwrap();
assert_eq!(table.version(), 10);

let mut table = DeltaTableBuilder::from_uri(path).with_version(0).with_buffer(buf_size).load().await.unwrap();
table.update_incremental(Some(0)).await.unwrap();
assert_eq!(table.version(), 10);

let mut table = DeltaTableBuilder::from_uri(path).with_version(0).with_buffer(buf_size).load().await.unwrap();
table.update_incremental(Some(1)).await.unwrap();
assert_eq!(table.version(), 1);

let mut table = DeltaTableBuilder::from_uri(path).with_version(0).with_buffer(buf_size).load().await.unwrap();
table.update_incremental(Some(10)).await.unwrap();
assert_eq!(table.version(), 10);

let mut table = DeltaTableBuilder::from_uri(path).with_version(0).with_buffer(buf_size).load().await.unwrap();
table.update_incremental(Some(20)).await.unwrap();
assert_eq!(table.version(), 10);
}
}

#[tokio::test]
async fn test_read_vacuumed_log() {
let path = "./tests/data/checkpoints_vacuumed";
Expand Down