Skip to content

Improve to_arrow_batch_reader performance + use to_arrow_batch_reader in upsert to lower memory pressure #1995

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: main
Choose a base branch
from

Conversation

koenvo
Copy link
Contributor

@koenvo koenvo commented May 13, 2025

Summary

This PR updates the upsert logic to use batch processing. The main goal is to prevent out-of-memory (OOM) issues when updating large tables by avoiding loading all data at once.

Note: This has only been tested against the unit tests—no real-world datasets have been evaluated yet.

This PR partially depends on functionality introduced in #1817.


Notes

  • Duplicate detection across multiple batches is not possible with this approach.
  • All data is read sequentially, which may be slower than the parallel read used by to_arrow. fixed using concurrent_tasks parameter

Performance Comparison

In setups with many small files, network and metadata overhead become the dominant factor. This impacts batch reading performance, as each file contributes relatively more overhead than payload. In the test setup used here, metadata access was the largest cost.

Using to_arrow_batch_reader (sequential):

  • Scan: 9993.50 ms
  • To list: 19811.09 ms

Using to_arrow (parallel):

  • Scan: 10607.88 ms

@jayceslesar
Copy link
Contributor

fwiw I think we should try to get this merged in at some point. Some ideas:

  1. Make it a flag to use the batchreader or not, some users might have basically infinite memory
  2. Is there a more optimal way to batch data? Thinking along the lines of using partitions although that may already happen under the hood

@koenvo
Copy link
Contributor Author

koenvo commented Jun 2, 2025

fwiw I think we should try to get this merged in at some point. Some ideas:

  1. Make it a flag to use the batchreader or not, some users might have basically infinite memory
  2. Is there a more optimal way to batch data? Thinking along the lines of using partitions although that may already happen under the hood

I've been thinking about what I (as a developer) want. The answer is: set max memory usage.

Some ideas:

  1. Determine which partitions can fit together in memory and batch load those together
  2. Fetching of parquet files can happen parallel and only do loading sequential
  3. Combine 1 and 2

@koenvo
Copy link
Contributor Author

koenvo commented Jun 2, 2025

Did an update and ran a quick benchmark with different concurrent_tasks settings on to_arrow_batch_reader():

table = catalog.get_table("some_table")

# Benchmark loop
p = table.scan().to_arrow_batch_reader(concurrent_tasks=100)
for batch in tqdm.tqdm(p):
    print(pool.max_memory())

Results (including pool.max_memory()):

  • concurrent_tasks=152it [00:06, 7.73it/s] | Max memory: 7.4 MB
  • concurrent_tasks=10391it [00:06, 61.98it/s] | Max memory: 36.3 MB
  • concurrent_tasks=201412it [00:15, 83.54it/s] | Max memory: 147 MB
  • concurrent_tasks=1001030it [00:09, 106.84it/s] | Max memory: 1.76 GB

Some more testing (on 100mbit connection):

scan.to_arrow_batch_reader(concurrent_tasks=10)
2025-06-03 11:02:48.986 INFO Starting
2025-06-03 11:05:10.927 INFO Rows: 13584102
2025-06-03 11:05:10.927 INFO Memory usage: 78.4MB

scan.to_arrow()
2025-06-03 11:05:47.211 INFO Starting
2025-06-03 11:08:09.907 INFO Rows: 13584102
2025-06-03 11:08:09.907 INFO Memory usage: 11GB

Note: Performance also depends on the network connection.

return self._record_batches_from_scan_tasks_and_deletes(tasks, deletes_per_file)

if concurrent_tasks is not None:
with ThreadPoolExecutor(max_workers=concurrent_tasks) as pool:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than create your own threadpool executor here, I think you should use the ExecutorFactory defined elsewhere in the repo. It has a get_or_create method that prevents creating a new threadpool on every call, among other things.

Copy link
Contributor Author

@koenvo koenvo Jun 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah thanks! Had this changed but forgot to push. Only need to make sure I get a pool with the correct max_workers set. Can't just use the regular get_or_create as that might have an incorrect number of workers.

@koenvo koenvo marked this pull request as ready for review June 3, 2025 09:59
@koenvo koenvo changed the title Use batchreader in upsert Improve to_arrow_batch_reader performance + use to_arrow_batch_reader in upsert to lower memory pressure Jun 3, 2025
Comment on lines 1615 to 1631
for batches in executor.map(
lambda task: list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file)), tasks
):
for batch in batches:
current_batch_size = len(batch)
if self._limit is not None and total_row_count + current_batch_size >= self._limit:
yield batch.slice(0, self._limit - total_row_count)

# This break will also cancel all running tasks
limit_reached = True
break

yield batch
total_row_count += current_batch_size

if limit_reached:
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this preserve the ordering still? It looks like it did in to_table

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Executor.map maintains ordering by default. It first submits all jobs, and then waits for the result in original order.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤩

@koenvo
Copy link
Contributor Author

koenvo commented Jun 3, 2025

Did another update to get rid of the concurrent_tasks argument. It now defaults to the max-workers Config.

I also refactored to_arrow to use to_arrow_batch_reader under the hood to prevent duplicate implementations of the same functionality.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants