feat(filesystem): stream Iceberg merge/upsert with batched reads and GC interval#1
Draft
ajinzrathod-tmdc wants to merge 13 commits into
Draft
feat(filesystem): stream Iceberg merge/upsert with batched reads and GC interval#1ajinzrathod-tmdc wants to merge 13 commits into
ajinzrathod-tmdc wants to merge 13 commits into
Conversation
Collaborator
Author
|
P.S: Every operation was performed on memory limit of 4GB with batch size of 10000. |
…ceberg merge engine production-ready
…t_forward and added support for multiple cloud providers
…tproof classpath In Spark local[*] mode, spark.executor.extraClassPath set programmatically via SparkConf is silently ignored — the local executor reuses the driver JVM but takes its classpath from the launcher, not from the conf. This caused HadoopFileIO to fail with ClassNotFoundException for SecureAzureBlobFileSystem on the executor side even when driver-side class loading worked. Fix: symlink (or copy) cached jars into \$SPARK_HOME/jars/ before SparkSession creation. That directory is loaded by Spark's launcher onto the JVM system classloader BEFORE any user code, so every subsequent classloader (driver, executor, Hadoop's Configuration.classLoader) sees the classes unconditionally. Falls back to spark.jars + extraClassPath + --jars when \$SPARK_HOME/jars is not writable (e.g. read-only filesystem). Co-authored-by: Cursor <cursoragent@cursor.com>
…arquet batch via env
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Streams Iceberg loads on the filesystem destination so large Parquet inputs are not fully materialized in memory. Merge/upsert and append/replace paths consume Arrow data as RecordBatchReader / batched scans, with optional periodic gc.collect() to cap memory during long runs.
Tests
tests/load/pipeline/test_open_table_pipeline.py: expanded coverage for the streaming Iceberg load / open-table pipeline behavior (per commit).
This is the snapshot after following operations:
Each append/replace operation took ~7.5 mins (55GB of data). Peak RSS recorded: ~2800 MB
Upsert numbers (against current snapshot pointing to 55GB of data)
FWIW, this can be made significantly faster with a SQL-driven approach: I tested a three-step pipeline, (1) scan the table’s primary keys into SQLite, (2) identify only the Iceberg data files impacted by the incoming update Parquet files, and (3) apply updates via PyIceberg on just that planned subset, and the full run completed in ~10 minutes for all touched files with ~900 MB of update Parquet, delivering much lower runtime and memory usage compared to the streamed upsert approach for comparable work.