Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet: close zstd input stream early to avoid memory pressure #5681

Merged
merged 2 commits into from
Sep 1, 2022

Conversation

bryanck
Copy link
Contributor

@bryanck bryanck commented Aug 31, 2022

This PR adds a workaround for memory issues encountered when reading Parquet files compressed with zstd. During some load testing on Spark, we encountered various OOM kills when reading from zstd compressed tables. One suggested solution was to set the environment variable MALLOC_TRIM_THRESHOLD_ to something lower than default, like 8192. This helped in some cases but not all.

Upon further investigation, it appeared that buffers were accumulating...
Screen Shot 2022-08-30 at 6 59 47 PM

Disabling the buffer pool resulted in finalizers accumulating instead...
Screen Shot 2022-08-30 at 8 11 48 PM

The solution is the same being proposed in parquet-mr. The current version of Parquet will leave the decompress stream open. Instead of leaving it open, this PR changes the behavior to read the stream fully into a buffer and then close the stream, allowing native resources to be freed immediately rather than waiting for garbage collection, and the buffer to be returned to the pool for reuse.

MALLOC_TRIM_THRESHOLD_ should no longer be required to be lowered with this change. Anecdotally, this resulted in better performance (compared to setting MALLOC_TRIM_THRESHOLD_), but more testing would be needed to validate that.

Alternatively, we could wait for the Parquet PR to be merged, but this is a more targeted fix. Also we could add a flag of some sort if desired. Ideally we would backport this to 0.14.x.

Here's a viz of the heap dump with this change...
Screen Shot 2022-08-31 at 5 55 41 AM

} else {
optionsBuilder = ParquetReadOptions.builder();
optionsBuilder.withCodecFactory(new ParquetCodecFactory(new Configuration(), 0));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safe to pass 0 for page size? How is that option used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The page size isn't used for decompressors according to this comment. This is what is currently being set by default in the options builder.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added comments to make that more clear

Copy link
Contributor

@kbendick kbendick Sep 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idea: Maybe making this a named constant, like UNUSED_PARQUET_DECOMPRESSOR_PAGE_SIZE or something would be a good way of indicating that?

Given this solution is temporary, I don’t have a strong feeling either way, but I do like avoiding magic numbers.

Maybe a link to the upstream parquet-mr PR or an Iceberg issue related to zstd decompression would be more informative than just mentioning that page size is essentially ignored? The decompressor class itself might be sufficient documentation though.

@kbendick
Copy link
Contributor

kbendick commented Sep 1, 2022

Alternatively, we could wait for the Parquet PR to be merged, but this is a more targeted fix.

I’m +1 to patching for Iceberg until the parquet-mr release is made available. One of the benefits of controlling the parquet version and writers is that we can do these things.

Thanks @bryanck! Will review this ASAP. Have been out pretty sick but occasionally I wake up and am able to review etc.

@rdblue rdblue added this to the Iceberg 0.14.1 Release milestone Sep 1, 2022
Copy link
Contributor

@rdblue rdblue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great to me. Thanks, @bryanck!

@rdblue rdblue merged commit 1f3f707 into apache:master Sep 1, 2022
rdblue pushed a commit to rdblue/iceberg that referenced this pull request Sep 2, 2022
rdblue pushed a commit to rdblue/iceberg that referenced this pull request Sep 2, 2022
rdblue pushed a commit to rdblue/iceberg that referenced this pull request Sep 2, 2022
rdblue pushed a commit to rdblue/iceberg that referenced this pull request Sep 2, 2022
rdblue pushed a commit to rdblue/iceberg that referenced this pull request Sep 2, 2022
rdblue pushed a commit to rdblue/iceberg that referenced this pull request Sep 2, 2022
sunchao pushed a commit to apache/spark that referenced this pull request Feb 20, 2023
…und for PARQUET-2160

### What changes were proposed in this pull request?

SPARK-41952 was raised for a while, but unfortunately, the Parquet community does not publish the patched version yet, as a workaround, we can fix the issue on the Spark side first.

We encountered this memory issue when migrating data from parquet/snappy to parquet/zstd, Spark executors always occupy unreasonable off-heap memory and have a high risk of being killed by NM.

See more discussions at apache/parquet-java#982 and apache/iceberg#5681

### Why are the changes needed?

The issue is fixed in the parquet community [PARQUET-2160](https://issues.apache.org/jira/browse/PARQUET-2160), but the patched version is not available yet.

### Does this PR introduce _any_ user-facing change?

Yes, it's bug fix.

### How was this patch tested?

The existing UT should cover the correctness check, I also verified this patch by scanning a large parquet/zstd table.

```
spark-shell --executor-cores 4 --executor-memory 6g --conf spark.executor.memoryOverhead=2g
```

```
spark.sql("select sum(hash(*)) from parquet_zstd_table ").show(false)
```

- before this patch

All executors get killed by NM quickly.
```
ERROR YarnScheduler: Lost executor 1 on hadoop-xxxx.****.org: Container killed by YARN for exceeding physical memory limits. 8.2 GB of 8 GB physical memory used. Consider boosting spark.executor.memoryOverhead.
```
<img width="1872" alt="image" src="https://user-images.githubusercontent.com/26535726/220031678-e9060244-5586-4f0c-8fe7-55bb4e20a580.png">

- after this patch

Query runs well, no executor gets killed.

<img width="1881" alt="image" src="https://user-images.githubusercontent.com/26535726/220031917-4fe38c07-b38f-49c6-a982-2091a6c2a8ed.png">

Closes #40091 from pan3793/SPARK-41952.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Chao Sun <sunchao@apple.com>
sunchao pushed a commit to apache/spark that referenced this pull request Feb 20, 2023
…und for PARQUET-2160

### What changes were proposed in this pull request?

SPARK-41952 was raised for a while, but unfortunately, the Parquet community does not publish the patched version yet, as a workaround, we can fix the issue on the Spark side first.

We encountered this memory issue when migrating data from parquet/snappy to parquet/zstd, Spark executors always occupy unreasonable off-heap memory and have a high risk of being killed by NM.

See more discussions at apache/parquet-java#982 and apache/iceberg#5681

### Why are the changes needed?

The issue is fixed in the parquet community [PARQUET-2160](https://issues.apache.org/jira/browse/PARQUET-2160), but the patched version is not available yet.

### Does this PR introduce _any_ user-facing change?

Yes, it's bug fix.

### How was this patch tested?

The existing UT should cover the correctness check, I also verified this patch by scanning a large parquet/zstd table.

```
spark-shell --executor-cores 4 --executor-memory 6g --conf spark.executor.memoryOverhead=2g
```

```
spark.sql("select sum(hash(*)) from parquet_zstd_table ").show(false)
```

- before this patch

All executors get killed by NM quickly.
```
ERROR YarnScheduler: Lost executor 1 on hadoop-xxxx.****.org: Container killed by YARN for exceeding physical memory limits. 8.2 GB of 8 GB physical memory used. Consider boosting spark.executor.memoryOverhead.
```
<img width="1872" alt="image" src="https://user-images.githubusercontent.com/26535726/220031678-e9060244-5586-4f0c-8fe7-55bb4e20a580.png">

- after this patch

Query runs well, no executor gets killed.

<img width="1881" alt="image" src="https://user-images.githubusercontent.com/26535726/220031917-4fe38c07-b38f-49c6-a982-2091a6c2a8ed.png">

Closes #40091 from pan3793/SPARK-41952.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Chao Sun <sunchao@apple.com>
sunchao pushed a commit to apache/spark that referenced this pull request Feb 20, 2023
…und for PARQUET-2160

### What changes were proposed in this pull request?

SPARK-41952 was raised for a while, but unfortunately, the Parquet community does not publish the patched version yet, as a workaround, we can fix the issue on the Spark side first.

We encountered this memory issue when migrating data from parquet/snappy to parquet/zstd, Spark executors always occupy unreasonable off-heap memory and have a high risk of being killed by NM.

See more discussions at apache/parquet-java#982 and apache/iceberg#5681

### Why are the changes needed?

The issue is fixed in the parquet community [PARQUET-2160](https://issues.apache.org/jira/browse/PARQUET-2160), but the patched version is not available yet.

### Does this PR introduce _any_ user-facing change?

Yes, it's bug fix.

### How was this patch tested?

The existing UT should cover the correctness check, I also verified this patch by scanning a large parquet/zstd table.

```
spark-shell --executor-cores 4 --executor-memory 6g --conf spark.executor.memoryOverhead=2g
```

```
spark.sql("select sum(hash(*)) from parquet_zstd_table ").show(false)
```

- before this patch

All executors get killed by NM quickly.
```
ERROR YarnScheduler: Lost executor 1 on hadoop-xxxx.****.org: Container killed by YARN for exceeding physical memory limits. 8.2 GB of 8 GB physical memory used. Consider boosting spark.executor.memoryOverhead.
```
<img width="1872" alt="image" src="https://user-images.githubusercontent.com/26535726/220031678-e9060244-5586-4f0c-8fe7-55bb4e20a580.png">

- after this patch

Query runs well, no executor gets killed.

<img width="1881" alt="image" src="https://user-images.githubusercontent.com/26535726/220031917-4fe38c07-b38f-49c6-a982-2091a6c2a8ed.png">

Closes #40091 from pan3793/SPARK-41952.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Chao Sun <sunchao@apple.com>
sunchao pushed a commit to apache/spark that referenced this pull request Feb 20, 2023
…und for PARQUET-2160

### What changes were proposed in this pull request?

SPARK-41952 was raised for a while, but unfortunately, the Parquet community does not publish the patched version yet, as a workaround, we can fix the issue on the Spark side first.

We encountered this memory issue when migrating data from parquet/snappy to parquet/zstd, Spark executors always occupy unreasonable off-heap memory and have a high risk of being killed by NM.

See more discussions at apache/parquet-java#982 and apache/iceberg#5681

### Why are the changes needed?

The issue is fixed in the parquet community [PARQUET-2160](https://issues.apache.org/jira/browse/PARQUET-2160), but the patched version is not available yet.

### Does this PR introduce _any_ user-facing change?

Yes, it's bug fix.

### How was this patch tested?

The existing UT should cover the correctness check, I also verified this patch by scanning a large parquet/zstd table.

```
spark-shell --executor-cores 4 --executor-memory 6g --conf spark.executor.memoryOverhead=2g
```

```
spark.sql("select sum(hash(*)) from parquet_zstd_table ").show(false)
```

- before this patch

All executors get killed by NM quickly.
```
ERROR YarnScheduler: Lost executor 1 on hadoop-xxxx.****.org: Container killed by YARN for exceeding physical memory limits. 8.2 GB of 8 GB physical memory used. Consider boosting spark.executor.memoryOverhead.
```
<img width="1872" alt="image" src="https://user-images.githubusercontent.com/26535726/220031678-e9060244-5586-4f0c-8fe7-55bb4e20a580.png">

- after this patch

Query runs well, no executor gets killed.

<img width="1881" alt="image" src="https://user-images.githubusercontent.com/26535726/220031917-4fe38c07-b38f-49c6-a982-2091a6c2a8ed.png">

Closes #40091 from pan3793/SPARK-41952.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Chao Sun <sunchao@apple.com>
sunchao pushed a commit to sunchao/iceberg that referenced this pull request May 9, 2023
sunchao pushed a commit to sunchao/iceberg that referenced this pull request May 10, 2023
sunchao pushed a commit to sunchao/spark that referenced this pull request Jun 2, 2023
…und for PARQUET-2160

### What changes were proposed in this pull request?

SPARK-41952 was raised for a while, but unfortunately, the Parquet community does not publish the patched version yet, as a workaround, we can fix the issue on the Spark side first.

We encountered this memory issue when migrating data from parquet/snappy to parquet/zstd, Spark executors always occupy unreasonable off-heap memory and have a high risk of being killed by NM.

See more discussions at apache/parquet-java#982 and apache/iceberg#5681

### Why are the changes needed?

The issue is fixed in the parquet community [PARQUET-2160](https://issues.apache.org/jira/browse/PARQUET-2160), but the patched version is not available yet.

### Does this PR introduce _any_ user-facing change?

Yes, it's bug fix.

### How was this patch tested?

The existing UT should cover the correctness check, I also verified this patch by scanning a large parquet/zstd table.

```
spark-shell --executor-cores 4 --executor-memory 6g --conf spark.executor.memoryOverhead=2g
```

```
spark.sql("select sum(hash(*)) from parquet_zstd_table ").show(false)
```

- before this patch

All executors get killed by NM quickly.
```
ERROR YarnScheduler: Lost executor 1 on hadoop-xxxx.****.org: Container killed by YARN for exceeding physical memory limits. 8.2 GB of 8 GB physical memory used. Consider boosting spark.executor.memoryOverhead.
```
<img width="1872" alt="image" src="https://user-images.githubusercontent.com/26535726/220031678-e9060244-5586-4f0c-8fe7-55bb4e20a580.png">

- after this patch

Query runs well, no executor gets killed.

<img width="1881" alt="image" src="https://user-images.githubusercontent.com/26535726/220031917-4fe38c07-b38f-49c6-a982-2091a6c2a8ed.png">

Closes apache#40091 from pan3793/SPARK-41952.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Chao Sun <sunchao@apple.com>
snmvaughan pushed a commit to snmvaughan/spark that referenced this pull request Jun 20, 2023
…und for PARQUET-2160

### What changes were proposed in this pull request?

SPARK-41952 was raised for a while, but unfortunately, the Parquet community does not publish the patched version yet, as a workaround, we can fix the issue on the Spark side first.

We encountered this memory issue when migrating data from parquet/snappy to parquet/zstd, Spark executors always occupy unreasonable off-heap memory and have a high risk of being killed by NM.

See more discussions at apache/parquet-java#982 and apache/iceberg#5681

### Why are the changes needed?

The issue is fixed in the parquet community [PARQUET-2160](https://issues.apache.org/jira/browse/PARQUET-2160), but the patched version is not available yet.

### Does this PR introduce _any_ user-facing change?

Yes, it's bug fix.

### How was this patch tested?

The existing UT should cover the correctness check, I also verified this patch by scanning a large parquet/zstd table.

```
spark-shell --executor-cores 4 --executor-memory 6g --conf spark.executor.memoryOverhead=2g
```

```
spark.sql("select sum(hash(*)) from parquet_zstd_table ").show(false)
```

- before this patch

All executors get killed by NM quickly.
```
ERROR YarnScheduler: Lost executor 1 on hadoop-xxxx.****.org: Container killed by YARN for exceeding physical memory limits. 8.2 GB of 8 GB physical memory used. Consider boosting spark.executor.memoryOverhead.
```
<img width="1872" alt="image" src="https://user-images.githubusercontent.com/26535726/220031678-e9060244-5586-4f0c-8fe7-55bb4e20a580.png">

- after this patch

Query runs well, no executor gets killed.

<img width="1881" alt="image" src="https://user-images.githubusercontent.com/26535726/220031917-4fe38c07-b38f-49c6-a982-2091a6c2a8ed.png">

Closes apache#40091 from pan3793/SPARK-41952.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Chao Sun <sunchao@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants