Skip to content

feat(filesystem): stream Iceberg merge/upsert with batched reads and GC interval#1

Draft
ajinzrathod-tmdc wants to merge 13 commits into
1.21.0-mergedfrom
feat/iceberg-streaming-atomic-commit
Draft

feat(filesystem): stream Iceberg merge/upsert with batched reads and GC interval#1
ajinzrathod-tmdc wants to merge 13 commits into
1.21.0-mergedfrom
feat/iceberg-streaming-atomic-commit

Conversation

@ajinzrathod-tmdc

@ajinzrathod-tmdc ajinzrathod-tmdc commented Apr 8, 2026

Copy link
Copy Markdown
Collaborator

Summary
Streams Iceberg loads on the filesystem destination so large Parquet inputs are not fully materialized in memory. Merge/upsert and append/replace paths consume Arrow data as RecordBatchReader / batched scans, with optional periodic gc.collect() to cap memory during long runs.

Tests
tests/load/pipeline/test_open_table_pipeline.py: expanded coverage for the streaming Iceberg load / open-table pipeline behavior (per commit).

This is the snapshot after following operations:

  1. Replace 500k records [1]
  2. Append 500k records [2]
  3. Replace 500k records [3, 4]

Each append/replace operation took ~7.5 mins (55GB of data). Peak RSS recorded: ~2800 MB

image image

Upsert numbers (against current snapshot pointing to 55GB of data)

  • 1 file (256 MB): ~17 minutes, ~2.2 GB RSS
image
  • 4 files (256 MB each): ~1.5 hours, ~4 GB RSS
image

FWIW, this can be made significantly faster with a SQL-driven approach: I tested a three-step pipeline, (1) scan the table’s primary keys into SQLite, (2) identify only the Iceberg data files impacted by the incoming update Parquet files, and (3) apply updates via PyIceberg on just that planned subset, and the full run completed in ~10 minutes for all touched files with ~900 MB of update Parquet, delivering much lower runtime and memory usage compared to the streamed upsert approach for comparable work.

@ajinzrathod-tmdc

Copy link
Copy Markdown
Collaborator Author

P.S: Every operation was performed on memory limit of 4GB with batch size of 10000.

@ajinzrathod-tmdc ajinzrathod-tmdc marked this pull request as draft April 9, 2026 06:24
ajinzrathod-tmdc and others added 12 commits April 11, 2026 01:45
…t_forward and added support for multiple cloud providers
…tproof classpath

In Spark local[*] mode, spark.executor.extraClassPath set programmatically
via SparkConf is silently ignored — the local executor reuses the driver
JVM but takes its classpath from the launcher, not from the conf. This
caused HadoopFileIO to fail with ClassNotFoundException for
SecureAzureBlobFileSystem on the executor side even when driver-side
class loading worked.

Fix: symlink (or copy) cached jars into \$SPARK_HOME/jars/ before
SparkSession creation. That directory is loaded by Spark's launcher onto
the JVM system classloader BEFORE any user code, so every subsequent
classloader (driver, executor, Hadoop's Configuration.classLoader) sees
the classes unconditionally.

Falls back to spark.jars + extraClassPath + --jars when \$SPARK_HOME/jars
is not writable (e.g. read-only filesystem).

Co-authored-by: Cursor <cursoragent@cursor.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant