Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory leaks when running the TPC-H benchmark repeatedly #884

Closed
Kontinuation opened this issue Aug 29, 2024 · 7 comments · Fixed by #890
Closed

Memory leaks when running the TPC-H benchmark repeatedly #884

Kontinuation opened this issue Aug 29, 2024 · 7 comments · Fixed by #890
Labels
bug Something isn't working

Comments

@Kontinuation
Copy link
Member

Kontinuation commented Aug 29, 2024

Describe the bug

I've built Datafusion Comet using commit f7f0bb1 for Spark 3.5.1. I found that the memory usage keeps increasing when repeatedly running the TPC-H benchmark script on a set of parquet files. The parquet files were generated using https://github.com/databricks/spark-sql-perf with scale factor = 10. The memory usage could be as high as 20GB. Given the spark and comet configurations I'm using to run the benchmarks (see Additional context) this seems to be problematic.

image

I've noticed that the native memory allocated by Unsafe_AllocateMemory0 keeps increasing using jcmd VM.native_memory detail.diff | grep Unsafe -A 2. I'm not enabling offheap memory so the allocation should be initiated by the arrow RootAllocator:

Initially after setting the baseline:

[0x00000001099c98a8] Unsafe_AllocateMemory0(JNIEnv_*, _jobject*, long)+0xcc
[0x000000011a0523b4]
                             (malloc=870721KB type=Other +621478KB #6842866 +4937676)
--
[0x00000001099c98a8] Unsafe_AllocateMemory0(JNIEnv_*, _jobject*, long)+0xcc
[0x0000000119017be0]
                             (malloc=8463KB type=Other -469KB #221 -3)

After 10 minutes:

[0x00000001099c98a8] Unsafe_AllocateMemory0(JNIEnv_*, _jobject*, long)+0xcc
[0x000000011a0523b4]
                             (malloc=4349265KB type=Other +4100021KB #34671096 +32765906)
--
[0x00000001099c98a8] Unsafe_AllocateMemory0(JNIEnv_*, _jobject*, long)+0xcc
[0x0000000119017be0]
                             (malloc=8449KB type=Other -483KB #217 -7)

The leaked memory were allocated by the CometArrowAllocator. I've verified this by attaching a debugger to the Spark process and inspected CometArrowAllocator.getAllocatedMemory:

image

I've also deliberately disabled AQE coalesce partitions since I noticed this issue: #381. Although it is fixed I still disabled it for being safe. See Additional context section for more details.

Steps to reproduce

Run the TPC-H benchmark script with --iterations=100 and observe the RSS of the Spark process increase over time.

Expected behavior

Memory usage should not increase over time.

Additional context

I'm simply running it locally with master = local[4]. Here are my test environment and spark configurations:

Environment:

  • Operating System: macOS 14.6.1, arch: Apple M1 Pro
  • Apache Spark: 3.5.1
  • Datafusion Comet: commit f7f0bb1
  • JVM: 17.0.10 (Eclipse Adoptium)

Spark configurations:

spark.master                     local[4]
spark.driver.cores               4
spark.executor.cores             4
spark.driver.memory              4g
spark.executor.memory            4g
spark.comet.memory.overhead.factor 0.4

spark.jars                     /path/to/workspace/github/datafusion-comet/spark/target/comet-spark-spark3.5_2.12-0.3.0-SNAPSHOT.jar

spark.driver.extraClassPath    /path/to/workspace/github/datafusion-comet/spark/target/comet-spark-spark3.5_2.12-0.3.0-SNAPSHOT.jar
spark.executor.extraClassPath  /path/to/workspace/github/datafusion-comet/spark/target/comet-spark-spark3.5_2.12-0.3.0-SNAPSHOT.jar

spark.serializer          org.apache.spark.serializer.KryoSerializer

spark.sql.extensions         org.apache.comet.CometSparkSessionExtensions
spark.comet.enabled          true
spark.comet.exec.enabled     true
spark.comet.exec.all.enabled true
spark.comet.explainFallback.enabled false

spark.comet.exec.shuffle.enabled true
spark.comet.exec.shuffle.mode auto
spark.shuffle.manager org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager

# Disable AQE coalesce partitions
spark.sql.adaptive.enabled   false
spark.sql.adaptive.coalescePartitions.enabled  false

# Enable debugging and native memory tracking
spark.driver.extraJavaOptions  -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -XX:NativeMemoryTracking=detail
@Kontinuation Kontinuation added the bug Something isn't working label Aug 29, 2024
@Kontinuation Kontinuation changed the title Memory leaks found when running the TPC-H benchmark repeatedly Memory leaks when running the TPC-H benchmark repeatedly Aug 29, 2024
@Kontinuation
Copy link
Member Author

I've investigated the problem and found that the leak is caused by these 2 allocations: https://github.com/apache/datafusion-comet/blob/0.2.0/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala#L65-L66

val arrowSchema = ArrowSchema.allocateNew(allocator)
val arrowArray = ArrowArray.allocateNew(allocator)

This is for constructing the Arrow C data structures for transferring Arrow batch vectors from Scala (JVM) to the native executor (Rust). The native executor will move the transferred vectors and take ownership of them, but the arrowSchema and arrowArray base structures allocated in JVM never get released. Each time we transfer a batch from JVM to the native executor, we leak 2 base structures worth of memory.

I applied a fix on my fork and the problem went away: Kontinuation@a90f43a

image

The native memory allocated by Unsafe_AllocateMemory0 becomes pretty small and constant:

[0x00000001071c98a8] Unsafe_AllocateMemory0(JNIEnv_*, _jobject*, long)+0xcc
[0x00000001170c05b4]
                             (malloc=15386KB type=Other +1395KB #44 -123)
--
[0x00000001071c98a8] Unsafe_AllocateMemory0(JNIEnv_*, _jobject*, long)+0xcc
[0x0000000116817be0]
                             (malloc=8418KB type=Other -16KB #36 -5)

Running TPC-H benchmarks with AQE coalesce partitions enabled still has memory leak problem, I'm still investigating it.

@Kontinuation
Copy link
Member Author

Kontinuation commented Aug 29, 2024

Finally, I fixed the memory leak caused by AQE coalesce partitions: Kontinuation@8657a82

The ArrowStreamWriter holds arrow dictionaries internally, the native memory held by dictionaries will leak if the writer is not closed.

I've also tried reverting #613 on my fix, the allocator in StreamReader could be closed without reporting leaks when running all 22 TPC-H queries.

@viirya
Copy link
Member

viirya commented Aug 29, 2024

This is for constructing the Arrow C data structures for transferring Arrow batch vectors from Scala (JVM) to the native executor (Rust). The native executor will move the transferred vectors and take ownership of them, but the arrowSchema and arrowArray base structures allocated in JVM never get released. Each time we transfer a batch from JVM to the native executor, we leak 2 base structures worth of memory.

Hmm, arrowSchema and arrowArray structures should be automatically released when native side drops the imported array/schema. The release callback should do that. I think this follows C Data interface.

@Kontinuation
Copy link
Member Author

The release callback should do that

I'm afraid that's not the case. To my understanding, the release callback frees everything except the base structure. Please refer to the reference implementation of the release handler in the specification.

@viirya
Copy link
Member

viirya commented Aug 29, 2024

I'm afraid that's not the case. To my understanding, the release callback frees everything except the base structure. Please refer to the reference implementation of the release handler in the specification.

If you are referring the JVM instances of ArrowSchema and ArrowArray. They are not referred after exportBatch. How do they keep in JVM?

@Kontinuation
Copy link
Member Author

Kontinuation commented Aug 29, 2024

I'm afraid that's not the case. To my understanding, the release callback frees everything except the base structure. Please refer to the reference implementation of the release handler in the specification.

If you are referring the JVM instances of ArrowSchema and ArrowArray. They are not referred after exportBatch. How do they keep in JVM?

ArrowSchema and ArrowArray hold native memory. Garbage collecting these 2 objects does not release the native memory. The native memory has to be deallocated manually by calling the close method.

@viirya
Copy link
Member

viirya commented Aug 29, 2024

ArrowSchema and ArrowArray hold native memory. Garbage collecting these 2 objects does not release the native memory. The native memory has to be deallocated manually by calling the close method.

Ah, I took another look at JVM ArrowArray. The internal ArrowBuf should be released separately (done by close). Makes sense.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants