Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-41952][SQL] Fix Parquet zstd off-heap memory leak as a workaround for PARQUET-2160 #40091

Closed
wants to merge 4 commits into from

Conversation

pan3793
Copy link
Member

@pan3793 pan3793 commented Feb 20, 2023

What changes were proposed in this pull request?

SPARK-41952 was raised for a while, but unfortunately, the Parquet community does not publish the patched version yet, as a workaround, we can fix the issue on the Spark side first.

We encountered this memory issue when migrating data from parquet/snappy to parquet/zstd, Spark executors always occupy unreasonable off-heap memory and have a high risk of being killed by NM.

See more discussions at apache/parquet-java#982 and apache/iceberg#5681

Why are the changes needed?

The issue is fixed in the parquet community PARQUET-2160, but the patched version is not available yet.

Does this PR introduce any user-facing change?

Yes, it's bug fix.

How was this patch tested?

The existing UT should cover the correctness check, I also verified this patch by scanning a large parquet/zstd table.

spark-shell --executor-cores 4 --executor-memory 6g --conf spark.executor.memoryOverhead=2g
spark.sql("select sum(hash(*)) from parquet_zstd_table ").show(false)
  • before this patch

All executors get killed by NM quickly.

ERROR YarnScheduler: Lost executor 1 on hadoop-xxxx.****.org: Container killed by YARN for exceeding physical memory limits. 8.2 GB of 8 GB physical memory used. Consider boosting spark.executor.memoryOverhead.

image

  • after this patch

Query runs well, no executor gets killed.

image

@github-actions github-actions bot added the SQL label Feb 20, 2023
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-41952] Fix Parquet zstd off-heap memory leak as a workaround for PARQUET-2160 [SPARK-41952][SQL] Fix Parquet zstd off-heap memory leak as a workaround for PARQUET-2160 Feb 20, 2023
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, do you happen to know why Apache Parquet community didn't fill the Fixed Version of that parquet JIRA or did release, @pan3793 ? According to the log, it seems to be merged on September, 2022.

@pan3793
Copy link
Member Author

pan3793 commented Feb 20, 2023

Based on apache/parquet-java#982 (comment), I guess that the Parquet community may think it's not a critical issue, but in my case, it's critical.

Copy link
Contributor

@LuciferYang LuciferYang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For 3.2/.3.3/3.4 branch, I am ok to this fix way, and we can revert ParquetCodecFactory.java after upgrading parquet to new version in master.

@LuciferYang
Copy link
Contributor

banch-3.3/3.2 use parquet 1.12.2, if this fix is accepted, would you mind submitting pr for these two branches? @pan3793

@pan3793
Copy link
Member Author

pan3793 commented Feb 20, 2023

banch-3.3/3.2 use parquet 1.12.2, if this fix is accepted, would you mind submitting pr for these two branches? @pan3793

sure.

@pan3793
Copy link
Member Author

pan3793 commented Feb 20, 2023

I verified this patch by scanning a large parquet/zstd table and updated the PR description.

Copy link
Contributor

@LuciferYang LuciferYang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM

* workaround for memory issues encountered when reading from zstd-compressed files. For
* details, see <a href="https://issues.apache.org/jira/browse/PARQUET-2160">PARQUET-2160</a>
*/
public class ParquetCodecFactory extends CodecFactory {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add a TODO to remind us to revert this file when updating Parquet version

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

@@ -34,6 +34,8 @@
* This class implements a codec factory that is used when reading from Parquet. It adds a
* workaround for memory issues encountered when reading from zstd-compressed files. For
* details, see <a href="https://issues.apache.org/jira/browse/PARQUET-2160">PARQUET-2160</a>
*
* TODO: Remove this workaround after upgrading Parquet which include PARQUET-2160.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be TODO(SPARK-xxx):

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@dongjoon-hyun
Copy link
Member

Thank you all. And, feel free to merge to land this to the release branches, @sunchao .

cc @cloud-fan , @HyukjinKwon , @mridulm , @tgravescs

@sunchao sunchao closed this in 1688a87 Feb 20, 2023
sunchao pushed a commit that referenced this pull request Feb 20, 2023
…und for PARQUET-2160

### What changes were proposed in this pull request?

SPARK-41952 was raised for a while, but unfortunately, the Parquet community does not publish the patched version yet, as a workaround, we can fix the issue on the Spark side first.

We encountered this memory issue when migrating data from parquet/snappy to parquet/zstd, Spark executors always occupy unreasonable off-heap memory and have a high risk of being killed by NM.

See more discussions at apache/parquet-java#982 and apache/iceberg#5681

### Why are the changes needed?

The issue is fixed in the parquet community [PARQUET-2160](https://issues.apache.org/jira/browse/PARQUET-2160), but the patched version is not available yet.

### Does this PR introduce _any_ user-facing change?

Yes, it's bug fix.

### How was this patch tested?

The existing UT should cover the correctness check, I also verified this patch by scanning a large parquet/zstd table.

```
spark-shell --executor-cores 4 --executor-memory 6g --conf spark.executor.memoryOverhead=2g
```

```
spark.sql("select sum(hash(*)) from parquet_zstd_table ").show(false)
```

- before this patch

All executors get killed by NM quickly.
```
ERROR YarnScheduler: Lost executor 1 on hadoop-xxxx.****.org: Container killed by YARN for exceeding physical memory limits. 8.2 GB of 8 GB physical memory used. Consider boosting spark.executor.memoryOverhead.
```
<img width="1872" alt="image" src="https://user-images.githubusercontent.com/26535726/220031678-e9060244-5586-4f0c-8fe7-55bb4e20a580.png">

- after this patch

Query runs well, no executor gets killed.

<img width="1881" alt="image" src="https://user-images.githubusercontent.com/26535726/220031917-4fe38c07-b38f-49c6-a982-2091a6c2a8ed.png">

Closes #40091 from pan3793/SPARK-41952.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Chao Sun <sunchao@apple.com>
sunchao pushed a commit that referenced this pull request Feb 20, 2023
…und for PARQUET-2160

### What changes were proposed in this pull request?

SPARK-41952 was raised for a while, but unfortunately, the Parquet community does not publish the patched version yet, as a workaround, we can fix the issue on the Spark side first.

We encountered this memory issue when migrating data from parquet/snappy to parquet/zstd, Spark executors always occupy unreasonable off-heap memory and have a high risk of being killed by NM.

See more discussions at apache/parquet-java#982 and apache/iceberg#5681

### Why are the changes needed?

The issue is fixed in the parquet community [PARQUET-2160](https://issues.apache.org/jira/browse/PARQUET-2160), but the patched version is not available yet.

### Does this PR introduce _any_ user-facing change?

Yes, it's bug fix.

### How was this patch tested?

The existing UT should cover the correctness check, I also verified this patch by scanning a large parquet/zstd table.

```
spark-shell --executor-cores 4 --executor-memory 6g --conf spark.executor.memoryOverhead=2g
```

```
spark.sql("select sum(hash(*)) from parquet_zstd_table ").show(false)
```

- before this patch

All executors get killed by NM quickly.
```
ERROR YarnScheduler: Lost executor 1 on hadoop-xxxx.****.org: Container killed by YARN for exceeding physical memory limits. 8.2 GB of 8 GB physical memory used. Consider boosting spark.executor.memoryOverhead.
```
<img width="1872" alt="image" src="https://user-images.githubusercontent.com/26535726/220031678-e9060244-5586-4f0c-8fe7-55bb4e20a580.png">

- after this patch

Query runs well, no executor gets killed.

<img width="1881" alt="image" src="https://user-images.githubusercontent.com/26535726/220031917-4fe38c07-b38f-49c6-a982-2091a6c2a8ed.png">

Closes #40091 from pan3793/SPARK-41952.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Chao Sun <sunchao@apple.com>
@sunchao
Copy link
Member

sunchao commented Feb 20, 2023

Thanks! merged to master/branch-3.3/branch-3.2

@LuciferYang
Copy link
Contributor

LuciferYang commented Feb 20, 2023

@sunchao also need merge to branch-3.4 ...

sunchao pushed a commit that referenced this pull request Feb 20, 2023
…und for PARQUET-2160

### What changes were proposed in this pull request?

SPARK-41952 was raised for a while, but unfortunately, the Parquet community does not publish the patched version yet, as a workaround, we can fix the issue on the Spark side first.

We encountered this memory issue when migrating data from parquet/snappy to parquet/zstd, Spark executors always occupy unreasonable off-heap memory and have a high risk of being killed by NM.

See more discussions at apache/parquet-java#982 and apache/iceberg#5681

### Why are the changes needed?

The issue is fixed in the parquet community [PARQUET-2160](https://issues.apache.org/jira/browse/PARQUET-2160), but the patched version is not available yet.

### Does this PR introduce _any_ user-facing change?

Yes, it's bug fix.

### How was this patch tested?

The existing UT should cover the correctness check, I also verified this patch by scanning a large parquet/zstd table.

```
spark-shell --executor-cores 4 --executor-memory 6g --conf spark.executor.memoryOverhead=2g
```

```
spark.sql("select sum(hash(*)) from parquet_zstd_table ").show(false)
```

- before this patch

All executors get killed by NM quickly.
```
ERROR YarnScheduler: Lost executor 1 on hadoop-xxxx.****.org: Container killed by YARN for exceeding physical memory limits. 8.2 GB of 8 GB physical memory used. Consider boosting spark.executor.memoryOverhead.
```
<img width="1872" alt="image" src="https://user-images.githubusercontent.com/26535726/220031678-e9060244-5586-4f0c-8fe7-55bb4e20a580.png">

- after this patch

Query runs well, no executor gets killed.

<img width="1881" alt="image" src="https://user-images.githubusercontent.com/26535726/220031917-4fe38c07-b38f-49c6-a982-2091a6c2a8ed.png">

Closes #40091 from pan3793/SPARK-41952.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Chao Sun <sunchao@apple.com>
@sunchao
Copy link
Member

sunchao commented Feb 20, 2023

Yes, it's in branch-3.4 as well

sunchao pushed a commit to sunchao/spark that referenced this pull request Jun 2, 2023
…und for PARQUET-2160

### What changes were proposed in this pull request?

SPARK-41952 was raised for a while, but unfortunately, the Parquet community does not publish the patched version yet, as a workaround, we can fix the issue on the Spark side first.

We encountered this memory issue when migrating data from parquet/snappy to parquet/zstd, Spark executors always occupy unreasonable off-heap memory and have a high risk of being killed by NM.

See more discussions at apache/parquet-java#982 and apache/iceberg#5681

### Why are the changes needed?

The issue is fixed in the parquet community [PARQUET-2160](https://issues.apache.org/jira/browse/PARQUET-2160), but the patched version is not available yet.

### Does this PR introduce _any_ user-facing change?

Yes, it's bug fix.

### How was this patch tested?

The existing UT should cover the correctness check, I also verified this patch by scanning a large parquet/zstd table.

```
spark-shell --executor-cores 4 --executor-memory 6g --conf spark.executor.memoryOverhead=2g
```

```
spark.sql("select sum(hash(*)) from parquet_zstd_table ").show(false)
```

- before this patch

All executors get killed by NM quickly.
```
ERROR YarnScheduler: Lost executor 1 on hadoop-xxxx.****.org: Container killed by YARN for exceeding physical memory limits. 8.2 GB of 8 GB physical memory used. Consider boosting spark.executor.memoryOverhead.
```
<img width="1872" alt="image" src="https://user-images.githubusercontent.com/26535726/220031678-e9060244-5586-4f0c-8fe7-55bb4e20a580.png">

- after this patch

Query runs well, no executor gets killed.

<img width="1881" alt="image" src="https://user-images.githubusercontent.com/26535726/220031917-4fe38c07-b38f-49c6-a982-2091a6c2a8ed.png">

Closes apache#40091 from pan3793/SPARK-41952.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Chao Sun <sunchao@apple.com>
snmvaughan pushed a commit to snmvaughan/spark that referenced this pull request Jun 20, 2023
…und for PARQUET-2160

### What changes were proposed in this pull request?

SPARK-41952 was raised for a while, but unfortunately, the Parquet community does not publish the patched version yet, as a workaround, we can fix the issue on the Spark side first.

We encountered this memory issue when migrating data from parquet/snappy to parquet/zstd, Spark executors always occupy unreasonable off-heap memory and have a high risk of being killed by NM.

See more discussions at apache/parquet-java#982 and apache/iceberg#5681

### Why are the changes needed?

The issue is fixed in the parquet community [PARQUET-2160](https://issues.apache.org/jira/browse/PARQUET-2160), but the patched version is not available yet.

### Does this PR introduce _any_ user-facing change?

Yes, it's bug fix.

### How was this patch tested?

The existing UT should cover the correctness check, I also verified this patch by scanning a large parquet/zstd table.

```
spark-shell --executor-cores 4 --executor-memory 6g --conf spark.executor.memoryOverhead=2g
```

```
spark.sql("select sum(hash(*)) from parquet_zstd_table ").show(false)
```

- before this patch

All executors get killed by NM quickly.
```
ERROR YarnScheduler: Lost executor 1 on hadoop-xxxx.****.org: Container killed by YARN for exceeding physical memory limits. 8.2 GB of 8 GB physical memory used. Consider boosting spark.executor.memoryOverhead.
```
<img width="1872" alt="image" src="https://user-images.githubusercontent.com/26535726/220031678-e9060244-5586-4f0c-8fe7-55bb4e20a580.png">

- after this patch

Query runs well, no executor gets killed.

<img width="1881" alt="image" src="https://user-images.githubusercontent.com/26535726/220031917-4fe38c07-b38f-49c6-a982-2091a6c2a8ed.png">

Closes apache#40091 from pan3793/SPARK-41952.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Chao Sun <sunchao@apple.com>
@yujhe
Copy link

yujhe commented Oct 11, 2023

@pan3793 We found that in some cases this workaround did not work when reading Parquet files in zstd codec and outputting it to a new path.

val df = spark.read.parquet("zstd_parquet_file")
df.repartition(3).write.mode("overwrite").parquet("/tmp/test")

Screenshot 2023-10-11 at 10 48 19 AM

In debug mode, you can see it won't set ParquetCodecFactory when initializing org.apache.parquet.hadoop.ParquetFileReader which is called from ParquetFileFormat.buildReaderWithPartitionValues.

@pan3793
Copy link
Member Author

pan3793 commented Oct 11, 2023

@yujhe possible to provide a text thread stack? I will find some time to take a look later

@yujhe
Copy link

yujhe commented Oct 11, 2023

Sure, here is the thread stack.

"Executor task launch worker for task 0.0 in stage 1.0 (TID 1)@7377" daemon prio=5 tid=0x36 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
	  at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:770)
	  at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657)
	  at org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:162)
	  at org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
	  at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:405)
	  at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$Lambda$2412.203214861.apply(Unknown Source:-1)
	  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:209)
	  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:270)
	  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
	  at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	  at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:91)
	  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:341)
	  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$$Lambda$2521.582578288.apply(Unknown Source:-1)
	  at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1538)
	  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:348)
	  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$22(FileFormatWriter.scala:266)
	  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$$Lambda$2415.242150313.apply(Unknown Source:-1)
	  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	  at org.apache.spark.scheduler.Task.run(Task.scala:136)
	  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	  at org.apache.spark.executor.Executor$TaskRunner$$Lambda$1345.354344687.apply(Unknown Source:-1)
	  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	  at java.lang.Thread.run(Thread.java:748)

@yujhe
Copy link

yujhe commented Oct 31, 2023

We found that this happens if we are reading Parquet file with nested columns in schema.

val path = "/tmp/parquet_zstd"
(1 to 100).map(i => (i, Seq(i)))
  .toDF("id", "value")
  .repartition(1)
  .write
  .mode("overwrite")
  .parquet(path)

val df = spark.read.parquet(path)
df.write.mode("overwrite").parquet("/tmp/dummy")

After tracing the code, ParquetCodecFactory only applies to VectorizedParquetRecordReader with vectorized reader enabled.

However, for schema with nested columns, the vectorized reader is disabled by default in Spark 3.3 (spark.sql.parquet.enableNestedColumnVectorizedReader=false). Therefore, this workaround does not work in this case.

@pan3793
Copy link
Member Author

pan3793 commented Oct 31, 2023

@yujhe Oops, I totally forgot this one...

Your analysis makes sense, I took a look at the parquet non-vectorized reading code path, injecting such a workaround does not clean as we did in this PR. As this is not a regression, and the new Spark version adopts new Parquet version which already fixed this issue, I prefer to keep things as-is in branch-3.3.

@LuciferYang
Copy link
Contributor

@yujhe Oops, I totally forgot this one...

Your analysis makes sense, I took a look at the parquet non-vectorized reading code path, injecting such a workaround does not clean as we did in this PR. As this is not a regression, and the new Spark version adopts new Parquet version which already fixed this issue, I prefer to keep things as-is in branch-3.3.

+1, It seems there's no elegant way to fix the path where the native parquet-mr initializes the reader.

@yujhe
Copy link

yujhe commented Oct 31, 2023

Totally understand, we will try to apply patch PARQUET-2160 to parquet-hadoop-1.12.2 to our cluster.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants