-
Notifications
You must be signed in to change notification settings - Fork 302
Improve to_arrow_batch_reader performance + use to_arrow_batch_reader in upsert to lower memory pressure #1995
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…M when updating large tables
…all filter expressions. Prevents memory pressure due to large filters
fwiw I think we should try to get this merged in at some point. Some ideas:
|
I've been thinking about what I (as a developer) want. The answer is: set max memory usage. Some ideas:
|
Did an update and ran a quick benchmark with different table = catalog.get_table("some_table")
# Benchmark loop
p = table.scan().to_arrow_batch_reader(concurrent_tasks=100)
for batch in tqdm.tqdm(p):
print(pool.max_memory()) Results (including
|
pyiceberg/io/pyarrow.py
Outdated
return self._record_batches_from_scan_tasks_and_deletes(tasks, deletes_per_file) | ||
|
||
if concurrent_tasks is not None: | ||
with ThreadPoolExecutor(max_workers=concurrent_tasks) as pool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than create your own threadpool executor here, I think you should use the ExecutorFactory defined elsewhere in the repo. It has a get_or_create method that prevents creating a new threadpool on every call, among other things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah thanks! Had this changed but forgot to push. Only need to make sure I get a pool with the correct max_workers set. Can't just use the regular get_or_create
as that might have an incorrect number of workers.
pyiceberg/io/pyarrow.py
Outdated
for batches in executor.map( | ||
lambda task: list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file)), tasks | ||
): | ||
for batch in batches: | ||
current_batch_size = len(batch) | ||
if self._limit is not None and total_row_count + current_batch_size >= self._limit: | ||
yield batch.slice(0, self._limit - total_row_count) | ||
|
||
# This break will also cancel all running tasks | ||
limit_reached = True | ||
break | ||
|
||
yield batch | ||
total_row_count += current_batch_size | ||
|
||
if limit_reached: | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this preserve the ordering still? It looks like it did in to_table
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Executor.map
maintains ordering by default. It first submits all jobs, and then waits for the result in original order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤩
Did another update to get rid of the I also refactored |
Summary
This PR updates the upsert logic to use batch processing. The main goal is to prevent out-of-memory (OOM) issues when updating large tables by avoiding loading all data at once.
Note: This has only been tested against the unit tests—no real-world datasets have been evaluated yet.
This PR partially depends on functionality introduced in #1817.
Notes
All data is read sequentially, which may be slower than the parallel read used byfixed usingto_arrow
.concurrent_tasks
parameterPerformance Comparison
In setups with many small files, network and metadata overhead become the dominant factor. This impacts batch reading performance, as each file contributes relatively more overhead than payload. In the test setup used here, metadata access was the largest cost.
Using
to_arrow_batch_reader
(sequential):Using
to_arrow
(parallel):