-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-41952][SQL] Fix Parquet zstd off-heap memory leak as a workaround for PARQUET-2160 #40091
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @sunchao and @parthchandra
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, do you happen to know why Apache Parquet community didn't fill the Fixed Version
of that parquet JIRA or did release, @pan3793 ? According to the log, it seems to be merged on September, 2022.
Based on apache/parquet-java#982 (comment), I guess that the Parquet community may think it's not a critical issue, but in my case, it's critical. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For 3.2/.3.3/3.4 branch, I am ok to this fix way, and we can revert ParquetCodecFactory.java
after upgrading parquet to new version in master.
banch-3.3/3.2 use parquet 1.12.2, if this fix is accepted, would you mind submitting pr for these two branches? @pan3793 |
sure. |
I verified this patch by scanning a large parquet/zstd table and updated the PR description. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM
* workaround for memory issues encountered when reading from zstd-compressed files. For | ||
* details, see <a href="https://issues.apache.org/jira/browse/PARQUET-2160">PARQUET-2160</a> | ||
*/ | ||
public class ParquetCodecFactory extends CodecFactory { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should add a TODO
to remind us to revert this file when updating Parquet version
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
@@ -34,6 +34,8 @@ | |||
* This class implements a codec factory that is used when reading from Parquet. It adds a | |||
* workaround for memory issues encountered when reading from zstd-compressed files. For | |||
* details, see <a href="https://issues.apache.org/jira/browse/PARQUET-2160">PARQUET-2160</a> | |||
* | |||
* TODO: Remove this workaround after upgrading Parquet which include PARQUET-2160. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be TODO(SPARK-xxx):
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Thank you all. And, feel free to merge to land this to the release branches, @sunchao . cc @cloud-fan , @HyukjinKwon , @mridulm , @tgravescs |
…und for PARQUET-2160 ### What changes were proposed in this pull request? SPARK-41952 was raised for a while, but unfortunately, the Parquet community does not publish the patched version yet, as a workaround, we can fix the issue on the Spark side first. We encountered this memory issue when migrating data from parquet/snappy to parquet/zstd, Spark executors always occupy unreasonable off-heap memory and have a high risk of being killed by NM. See more discussions at apache/parquet-java#982 and apache/iceberg#5681 ### Why are the changes needed? The issue is fixed in the parquet community [PARQUET-2160](https://issues.apache.org/jira/browse/PARQUET-2160), but the patched version is not available yet. ### Does this PR introduce _any_ user-facing change? Yes, it's bug fix. ### How was this patch tested? The existing UT should cover the correctness check, I also verified this patch by scanning a large parquet/zstd table. ``` spark-shell --executor-cores 4 --executor-memory 6g --conf spark.executor.memoryOverhead=2g ``` ``` spark.sql("select sum(hash(*)) from parquet_zstd_table ").show(false) ``` - before this patch All executors get killed by NM quickly. ``` ERROR YarnScheduler: Lost executor 1 on hadoop-xxxx.****.org: Container killed by YARN for exceeding physical memory limits. 8.2 GB of 8 GB physical memory used. Consider boosting spark.executor.memoryOverhead. ``` <img width="1872" alt="image" src="https://user-images.githubusercontent.com/26535726/220031678-e9060244-5586-4f0c-8fe7-55bb4e20a580.png"> - after this patch Query runs well, no executor gets killed. <img width="1881" alt="image" src="https://user-images.githubusercontent.com/26535726/220031917-4fe38c07-b38f-49c6-a982-2091a6c2a8ed.png"> Closes #40091 from pan3793/SPARK-41952. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Chao Sun <sunchao@apple.com>
…und for PARQUET-2160 ### What changes were proposed in this pull request? SPARK-41952 was raised for a while, but unfortunately, the Parquet community does not publish the patched version yet, as a workaround, we can fix the issue on the Spark side first. We encountered this memory issue when migrating data from parquet/snappy to parquet/zstd, Spark executors always occupy unreasonable off-heap memory and have a high risk of being killed by NM. See more discussions at apache/parquet-java#982 and apache/iceberg#5681 ### Why are the changes needed? The issue is fixed in the parquet community [PARQUET-2160](https://issues.apache.org/jira/browse/PARQUET-2160), but the patched version is not available yet. ### Does this PR introduce _any_ user-facing change? Yes, it's bug fix. ### How was this patch tested? The existing UT should cover the correctness check, I also verified this patch by scanning a large parquet/zstd table. ``` spark-shell --executor-cores 4 --executor-memory 6g --conf spark.executor.memoryOverhead=2g ``` ``` spark.sql("select sum(hash(*)) from parquet_zstd_table ").show(false) ``` - before this patch All executors get killed by NM quickly. ``` ERROR YarnScheduler: Lost executor 1 on hadoop-xxxx.****.org: Container killed by YARN for exceeding physical memory limits. 8.2 GB of 8 GB physical memory used. Consider boosting spark.executor.memoryOverhead. ``` <img width="1872" alt="image" src="https://user-images.githubusercontent.com/26535726/220031678-e9060244-5586-4f0c-8fe7-55bb4e20a580.png"> - after this patch Query runs well, no executor gets killed. <img width="1881" alt="image" src="https://user-images.githubusercontent.com/26535726/220031917-4fe38c07-b38f-49c6-a982-2091a6c2a8ed.png"> Closes #40091 from pan3793/SPARK-41952. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Chao Sun <sunchao@apple.com>
Thanks! merged to master/branch-3.3/branch-3.2 |
@sunchao also need merge to branch-3.4 ... |
…und for PARQUET-2160 ### What changes were proposed in this pull request? SPARK-41952 was raised for a while, but unfortunately, the Parquet community does not publish the patched version yet, as a workaround, we can fix the issue on the Spark side first. We encountered this memory issue when migrating data from parquet/snappy to parquet/zstd, Spark executors always occupy unreasonable off-heap memory and have a high risk of being killed by NM. See more discussions at apache/parquet-java#982 and apache/iceberg#5681 ### Why are the changes needed? The issue is fixed in the parquet community [PARQUET-2160](https://issues.apache.org/jira/browse/PARQUET-2160), but the patched version is not available yet. ### Does this PR introduce _any_ user-facing change? Yes, it's bug fix. ### How was this patch tested? The existing UT should cover the correctness check, I also verified this patch by scanning a large parquet/zstd table. ``` spark-shell --executor-cores 4 --executor-memory 6g --conf spark.executor.memoryOverhead=2g ``` ``` spark.sql("select sum(hash(*)) from parquet_zstd_table ").show(false) ``` - before this patch All executors get killed by NM quickly. ``` ERROR YarnScheduler: Lost executor 1 on hadoop-xxxx.****.org: Container killed by YARN for exceeding physical memory limits. 8.2 GB of 8 GB physical memory used. Consider boosting spark.executor.memoryOverhead. ``` <img width="1872" alt="image" src="https://user-images.githubusercontent.com/26535726/220031678-e9060244-5586-4f0c-8fe7-55bb4e20a580.png"> - after this patch Query runs well, no executor gets killed. <img width="1881" alt="image" src="https://user-images.githubusercontent.com/26535726/220031917-4fe38c07-b38f-49c6-a982-2091a6c2a8ed.png"> Closes #40091 from pan3793/SPARK-41952. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Chao Sun <sunchao@apple.com>
Yes, it's in branch-3.4 as well |
…und for PARQUET-2160 ### What changes were proposed in this pull request? SPARK-41952 was raised for a while, but unfortunately, the Parquet community does not publish the patched version yet, as a workaround, we can fix the issue on the Spark side first. We encountered this memory issue when migrating data from parquet/snappy to parquet/zstd, Spark executors always occupy unreasonable off-heap memory and have a high risk of being killed by NM. See more discussions at apache/parquet-java#982 and apache/iceberg#5681 ### Why are the changes needed? The issue is fixed in the parquet community [PARQUET-2160](https://issues.apache.org/jira/browse/PARQUET-2160), but the patched version is not available yet. ### Does this PR introduce _any_ user-facing change? Yes, it's bug fix. ### How was this patch tested? The existing UT should cover the correctness check, I also verified this patch by scanning a large parquet/zstd table. ``` spark-shell --executor-cores 4 --executor-memory 6g --conf spark.executor.memoryOverhead=2g ``` ``` spark.sql("select sum(hash(*)) from parquet_zstd_table ").show(false) ``` - before this patch All executors get killed by NM quickly. ``` ERROR YarnScheduler: Lost executor 1 on hadoop-xxxx.****.org: Container killed by YARN for exceeding physical memory limits. 8.2 GB of 8 GB physical memory used. Consider boosting spark.executor.memoryOverhead. ``` <img width="1872" alt="image" src="https://user-images.githubusercontent.com/26535726/220031678-e9060244-5586-4f0c-8fe7-55bb4e20a580.png"> - after this patch Query runs well, no executor gets killed. <img width="1881" alt="image" src="https://user-images.githubusercontent.com/26535726/220031917-4fe38c07-b38f-49c6-a982-2091a6c2a8ed.png"> Closes apache#40091 from pan3793/SPARK-41952. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Chao Sun <sunchao@apple.com>
…und for PARQUET-2160 ### What changes were proposed in this pull request? SPARK-41952 was raised for a while, but unfortunately, the Parquet community does not publish the patched version yet, as a workaround, we can fix the issue on the Spark side first. We encountered this memory issue when migrating data from parquet/snappy to parquet/zstd, Spark executors always occupy unreasonable off-heap memory and have a high risk of being killed by NM. See more discussions at apache/parquet-java#982 and apache/iceberg#5681 ### Why are the changes needed? The issue is fixed in the parquet community [PARQUET-2160](https://issues.apache.org/jira/browse/PARQUET-2160), but the patched version is not available yet. ### Does this PR introduce _any_ user-facing change? Yes, it's bug fix. ### How was this patch tested? The existing UT should cover the correctness check, I also verified this patch by scanning a large parquet/zstd table. ``` spark-shell --executor-cores 4 --executor-memory 6g --conf spark.executor.memoryOverhead=2g ``` ``` spark.sql("select sum(hash(*)) from parquet_zstd_table ").show(false) ``` - before this patch All executors get killed by NM quickly. ``` ERROR YarnScheduler: Lost executor 1 on hadoop-xxxx.****.org: Container killed by YARN for exceeding physical memory limits. 8.2 GB of 8 GB physical memory used. Consider boosting spark.executor.memoryOverhead. ``` <img width="1872" alt="image" src="https://user-images.githubusercontent.com/26535726/220031678-e9060244-5586-4f0c-8fe7-55bb4e20a580.png"> - after this patch Query runs well, no executor gets killed. <img width="1881" alt="image" src="https://user-images.githubusercontent.com/26535726/220031917-4fe38c07-b38f-49c6-a982-2091a6c2a8ed.png"> Closes apache#40091 from pan3793/SPARK-41952. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Chao Sun <sunchao@apple.com>
@pan3793 We found that in some cases this workaround did not work when reading Parquet files in zstd codec and outputting it to a new path. val df = spark.read.parquet("zstd_parquet_file")
df.repartition(3).write.mode("overwrite").parquet("/tmp/test") In debug mode, you can see it won't set ParquetCodecFactory when initializing |
@yujhe possible to provide a text thread stack? I will find some time to take a look later |
Sure, here is the thread stack.
|
We found that this happens if we are reading Parquet file with nested columns in schema. val path = "/tmp/parquet_zstd"
(1 to 100).map(i => (i, Seq(i)))
.toDF("id", "value")
.repartition(1)
.write
.mode("overwrite")
.parquet(path)
val df = spark.read.parquet(path)
df.write.mode("overwrite").parquet("/tmp/dummy") After tracing the code, ParquetCodecFactory only applies to VectorizedParquetRecordReader with vectorized reader enabled. However, for schema with nested columns, the vectorized reader is disabled by default in Spark 3.3 ( |
@yujhe Oops, I totally forgot this one... Your analysis makes sense, I took a look at the parquet non-vectorized reading code path, injecting such a workaround does not clean as we did in this PR. As this is not a regression, and the new Spark version adopts new Parquet version which already fixed this issue, I prefer to keep things as-is in branch-3.3. |
+1, It seems there's no elegant way to fix the path where the native parquet-mr initializes the reader. |
Totally understand, we will try to apply patch PARQUET-2160 to parquet-hadoop-1.12.2 to our cluster. |
What changes were proposed in this pull request?
SPARK-41952 was raised for a while, but unfortunately, the Parquet community does not publish the patched version yet, as a workaround, we can fix the issue on the Spark side first.
We encountered this memory issue when migrating data from parquet/snappy to parquet/zstd, Spark executors always occupy unreasonable off-heap memory and have a high risk of being killed by NM.
See more discussions at apache/parquet-java#982 and apache/iceberg#5681
Why are the changes needed?
The issue is fixed in the parquet community PARQUET-2160, but the patched version is not available yet.
Does this PR introduce any user-facing change?
Yes, it's bug fix.
How was this patch tested?
The existing UT should cover the correctness check, I also verified this patch by scanning a large parquet/zstd table.
All executors get killed by NM quickly.
Query runs well, no executor gets killed.