Skip to content

Commit

Permalink
[SPARK-45014][CONNECT] Clean up fileserver when cleaning up files, ja…
Browse files Browse the repository at this point in the history
…rs and archives in SparkContext

### What changes were proposed in this pull request?

This PR proposes to clean up the files, jars and archives added via Spark Connect sessions.

### Why are the changes needed?

In [SPARK-44348](https://issues.apache.org/jira/browse/SPARK-44348), we clean up Spark Context's added files but we don't clean up the ones in fileserver.

### Does this PR introduce _any_ user-facing change?

Yes, it will avoid slowly growing memory within the file server.

### How was this patch tested?

Manually tested. Also existing tests should not be broken.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#42731 from HyukjinKwon/SPARK-45014.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
HyukjinKwon committed Aug 31, 2023
1 parent ed03173 commit 9a023c4
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,14 @@ class SparkConnectArtifactManager(sessionHolder: SessionHolder) extends Logging
s"sessionId: ${sessionHolder.sessionId}")

// Clean up added files
sessionHolder.session.sparkContext.addedFiles.remove(state.uuid)
sessionHolder.session.sparkContext.addedArchives.remove(state.uuid)
sessionHolder.session.sparkContext.addedJars.remove(state.uuid)
val fileserver = SparkEnv.get.rpcEnv.fileServer
val sparkContext = sessionHolder.session.sparkContext
sparkContext.addedFiles.remove(state.uuid).foreach(_.keys.foreach(fileserver.removeFile))
sparkContext.addedArchives.remove(state.uuid).foreach(_.keys.foreach(fileserver.removeFile))
sparkContext.addedJars.remove(state.uuid).foreach(_.keys.foreach(fileserver.removeJar))

// Clean up cached relations
val blockManager = sessionHolder.session.sparkContext.env.blockManager
val blockManager = sparkContext.env.blockManager
blockManager.removeCache(sessionHolder.userId, sessionHolder.sessionId)

// Clean up artifacts folder
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,19 @@ private[spark] trait RpcEnvFileServer {
fixedBaseUri
}

/**
* Removes a file from this RpcEnv.
*
* @param key Local file to remove.
*/
def removeFile(key: String): Unit

/**
* Removes a jar to from this RpcEnv.
*
* @param key Local jar to remove.
*/
def removeJar(key: String): Unit
}

private[spark] case class RpcEnvConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
private val jars = new ConcurrentHashMap[String, File]()
private val dirs = new ConcurrentHashMap[String, File]()

override def removeFile(key: String): Unit = files.remove(key)

override def removeJar(key: String): Unit = jars.remove(key)

override def getChunk(streamId: Long, chunkIndex: Int): ManagedBuffer = {
throw new UnsupportedOperationException()
}
Expand Down

0 comments on commit 9a023c4

Please sign in to comment.