Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Large Memory Spike At Checkpoint #2628

Open
rob-harrison opened this issue Jun 27, 2024 · 8 comments
Open

Large Memory Spike At Checkpoint #2628

rob-harrison opened this issue Jun 27, 2024 · 8 comments
Labels
bug Something isn't working

Comments

@rob-harrison
Copy link

Environment

Delta-rs version: deltalake==0.18.1

Binding: python (rust engine)

Environment:

Cloud provider: AWS
OS: Mac 10.15.7 and Debian GNU/Linux 12 (bookworm)
Other:


Bug

What happened:
We have a kafka consumer appending batches of records (±6k) to delta using write_deltalake.
Normal working memory is ±600Mi however when delta attempts to write its checkpoint every 100 batches, working memory spikes to ±7Gi (see grafana)

What you expected to happen:
We would expect only a small increase in working memory during checkpoint creation (as it should only need the last checkpoint + transaction logs - right?).
Would appreciate please any insight into why this might be happening and any options to alleviate it.

How to reproduce it:

from deltalake import DeltaTable, write_deltalake

    dt = DeltaTable(table_path, storage_options=storage_options)
    ...
    def append_heartbeats(self, events: List[Event]):
        # append heartbeats to datastore
        batch = build_pyarrow_batch(events=events)
        if batch.num_rows > 0:
            write_deltalake(
                dt,
                batch,
                partition_by="date",
                mode="append",
                engine="rust",
                large_dtypes=False,
            )

More details:
Screen Shot 2024-06-27 at 22 49 18

@rob-harrison rob-harrison added the bug Something isn't working label Jun 27, 2024
@sherlockbeard
Copy link
Contributor

sherlockbeard commented Jun 30, 2024

We would expect only a small increase in working memory during checkpoint creation (as it should only need the last checkpoint + transaction logs - right?).

yes checkpoint + transaction logs + unexpired deleted files

are your transaction logs very big in size ?. what's your last checkpoint size ?

@rob-harrison
Copy link
Author

rob-harrison commented Jul 1, 2024

are your transaction logs very big in size ?. what's your last checkpoint size ?

transaction logs are ±15k each
we do however have a lot of them if that's significant? (±400k - see checkpoint below)

2024-07-01 10:26:56      15507 00000000000000396955.json
2024-07-01 10:31:15      15409 00000000000000396956.json
2024-07-01 10:35:31      17004 00000000000000396957.json
2024-07-01 10:39:26      13652 00000000000000396958.json
2024-07-01 10:43:26      15457 00000000000000396959.json
2024-07-01 10:47:13      15504 00000000000000396960.json

last checkpoint size is ±155Mi

{"size":336792,"size_in_bytes":162615157,"version":396599}

@sherlockbeard
Copy link
Contributor

hmm super weird .

maybe @rtyler can you help you more .

@rtyler
Copy link
Member

rtyler commented Jul 1, 2024

ohno.jpeg

😄 This might be easier to pair with on Slack if you're open @rob-harrison since I have some questions which I doubt you'd want to answer in a public forum about the contents of some of those transaction logs. If there are a lot of actions that are being aggregated together and the checkpoint iteration is few and far between, I can imagine that requiring some hefty memory to compute since we have to load everything into memory.

Additionally if the table hasn't been optimized in a while, that can also increase memory with lots of small transactions.

@jfrancisco0
Copy link

Following up on the issue reported by @rob-harrison, after talking with @rtyler, we started consistently optimising the table once a day in Spark/Databricks, and it greatly improved memory usage, allowing us to allocate less memory to the service.

from delta.tables import *
deltaTable = DeltaTable.forPath(spark, 's3://path/')
deltaTable.optimize().executeCompaction()
deltaTable.vacuum(744)
Screenshot 2024-07-09 at 14 08 36

Any improvements to write efficiency would be greatly appreciated, and we'll make sure to keep delta-rs up to date to reap the benefits from any possible improvement. However, with frequent optimisation and compaction, we're in a much better place.

Thank you for the help!

@sherlockbeard
Copy link
Contributor

Hey, just few questions

engine="rust",

Q: was there any performance difference with pyarrow engine vs rust engine in this case ?.

optimising, vacuum the table once a day in Spark/Databricks

Q: you can do the same from delta-rs also . any special reason for spark ?.

@jfrancisco0
Copy link

@sherlockbeard I did not test using the pyarrow engine, so I can't tell.

Regarding the second question, initially that is what we did - run an optimiser cronjob. However, this had the same problem as the writer, it required a lot of memory to run and stalled, errored out or got OOMKilled. Our Spark cluster has more resources allocated, I don't know if memory usage was comparable (not monitoring).

So basically, we're running on Spark to separate the concerns and make sure the optimisation succeeds. Also, @rtyler recommended this separation, I don't know if he has stronger reasons.

@esarili
Copy link
Contributor

esarili commented Aug 16, 2024

We have a similar issue as well. Our checkpoint files have reached to 1.4GB.

Here are the metadata files in between two checkpoint:

 1530706578 00000000000000031230.checkpoint.parquet
     199174 00000000000000031230.json
    6826951 00000000000000031231.json
     431530 00000000000000031232.json
    6763451 00000000000000031233.json
     191128 00000000000000031234.json
     133161 00000000000000031235.json
      99406 00000000000000031236.json
    3973176 00000000000000031237.json
     108031 00000000000000031238.json
      99887 00000000000000031239.json
    6038682 00000000000000031240.json
     149010 00000000000000031241.json
     414978 00000000000000031242.json
    6929821 00000000000000031243.json
     199073 00000000000000031244.json
     190966 00000000000000031245.json
    7213536 00000000000000031246.json
     149519 00000000000000031247.json
     150066 00000000000000031248.json
     132726 00000000000000031249.json
    7370821 00000000000000031250.json
     116507 00000000000000031251.json
     157263 00000000000000031252.json
     150174 00000000000000031253.json
    7384264 00000000000000031254.json
     149712 00000000000000031255.json
     165866 00000000000000031256.json
     116819 00000000000000031257.json
    7352278 00000000000000031258.json
     150215 00000000000000031259.json
 1512187810 00000000000000031260.checkpoint.parquet

Metadata for checkpoint:

>  parq 00000000000000031260.checkpoint.parquet

 # Metadata
 <pyarrow._parquet.FileMetaData object at 0x1307293a0>
  created_by: parquet-rs version 50.0.0
  num_columns: 312
  num_rows: 860305
  num_row_groups: 1
  format_version: 1.0
  serialized_size: 109441

Our memory usage jumped to 13.1 GB when creating the checkpoint:
image

Most of memory allocations happen in deltalake_core::protocol::checkpoints::parquet_bytes_from_state function thru arrow_json::reader::Decoder::serialize calls.

We have two theories for the moment:

  1. We run compaction every two hours. We remove 13 files on average during compaction. Although we run vacum after each compaction we still use default 7 days vacuum retention. This might create a lot of tombstone files. AFAICT from the checkpointing code tombstone files are also loaded when creating a checkpoint.
  2. Following two lines might be loading entire parquet file contents at once into the memory:
    https://github.com/delta-io/delta-rs/blob/main/crates/core/src/protocol/checkpoints.rs#L351-L352
    Especially checking the serialize method here:
    https://github.com/apache/arrow-rs/blob/50.0.0/arrow-json/src/reader/mod.rs#L598-L600

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

5 participants