Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: buffered reading of transaction logs #1549

Merged
merged 32 commits into from
Sep 10, 2023
Merged
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
1d70da0
initial buffered log reading
eeroel Jul 20, 2023
dbd0bc4
another attempt
eeroel Jul 20, 2023
cbf4d10
limit reads to max version
eeroel Jul 20, 2023
fa1b641
clean up
eeroel Jul 20, 2023
834161c
attempt to refactor
eeroel Jul 20, 2023
e15e37a
some revisions
eeroel Jul 20, 2023
0c3d28e
handle stream length explicitly
eeroel Jul 20, 2023
e38d83e
fix conversion error handling, small tweaks
eeroel Jul 20, 2023
a402654
remove an unwrap
eeroel Jul 21, 2023
83d6881
initial config, test
eeroel Jul 21, 2023
8b30341
refactor test
eeroel Jul 22, 2023
741f137
expose to python
eeroel Jul 22, 2023
6381c90
clean docs
eeroel Jul 22, 2023
8bade70
expose to python
eeroel Jul 22, 2023
69467ae
end of stream handling, more testing
eeroel Jul 23, 2023
fc8e990
add test, limit for log buffer size
eeroel Jul 23, 2023
96fe14f
list files to get latest version
eeroel Jul 23, 2023
3e97d30
list from current
eeroel Jul 25, 2023
d6da3bf
fix initializer in bench code
eeroel Jul 25, 2023
e7e120d
use listing for get_latest_version
eeroel Jul 25, 2023
da3dbf1
clean up
eeroel Jul 25, 2023
32db9ec
some revisions
eeroel Jul 25, 2023
e22a687
use dynamic data for test
eeroel Jul 26, 2023
8549127
some revisions
eeroel Jul 27, 2023
7f7ee6f
update test file name
eeroel Aug 7, 2023
eb386c0
update python doc
eeroel Sep 5, 2023
fb3c8dd
format
eeroel Sep 5, 2023
987e2a0
satisfy linter
eeroel Sep 5, 2023
e57e020
more formatting
eeroel Sep 5, 2023
5dfdef4
rename builder method
eeroel Sep 5, 2023
41cfafb
yet more formatting
eeroel Sep 5, 2023
59e97cd
Merge branch 'main' into feat/buffered_log_reading
wjones127 Sep 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
another attempt
  • Loading branch information
eeroel committed Sep 8, 2023
commit dbd0bc409ca2fb937db6feb37e4ea8eee1ac8655
19 changes: 14 additions & 5 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,16 +601,22 @@ impl DeltaTable {
// based on example
// https://gendignoux.com/blog/2021/04/01/rust-async-streams-futures-part1.html#ordered-buffering
// just iterate infinitely as we can check later if we hit max
/*let logstream = stream::iter(self.version()..).map(|i| {
let store = self.storage.clone();
let logstream = futures::stream::iter(self.version()..).map(|i| {
let store2 = store.clone();
//let loc = futures::future::ok(commit_uri_from_version(i+1));
let loc = Arc::new(commit_uri_from_version(i+1));
//loc.and_then({
// |_loc| self.storage.get(&_loc)
//})
async move {
self.storage.get(&loc)
let store3 = store2.clone();
let out = store3.get(&loc);
out.await
}
});*/
});

/* // this compiles but seems to be sequential
let store = self.storage.clone();
let logstream = futures::stream::unfold(self.version(), |i| {
let loc = commit_uri_from_version(i+1).clone();
Expand All @@ -624,6 +630,8 @@ impl DeltaTable {
Some((result, i+1))
}
}).boxed();
*/

// why mut?
let mut buffer = logstream.buffered(50);

Expand All @@ -634,13 +642,14 @@ impl DeltaTable {
None => todo!()
};

/*
let buf = match buf_res {
Ok(x) => x,
Err(_) => todo!()
};

let foo = buf;
match foo {
let foo = buf;*/
match buf_res {
Ok(x) => self.peek_next_commit_with_buffer(self.version(), Some(Ok(x))).await,
Err(ObjectStoreError::NotFound { .. }) => Ok(PeekCommit::UpToDate),
Err(x) => Err(DeltaTableError::GenericError { source: Box::new(x) })
Expand Down