@@ -23,6 +23,7 @@ import java.lang.invoke.SerializedLambda
2323import scala .collection .mutable .{Map , Set , Stack }
2424import scala .language .existentials
2525
26+ import com .google .common .cache .{Cache , CacheBuilder , RemovalListener , RemovalNotification }
2627import org .apache .xbean .asm6 .{ClassReader , ClassVisitor , MethodVisitor , Type }
2728import org .apache .xbean .asm6 .Opcodes ._
2829
@@ -35,18 +36,36 @@ import org.apache.spark.internal.Logging
3536private [spark] object ClosureCleaner extends Logging {
3637
3738 private val isScala2_11 = scala.util.Properties .versionString.contains(" 2.11" )
39+ val classBytesCache : Cache [String , Array [Byte ]] = CacheBuilder .newBuilder
40+ .maximumSize(10000 )
41+ .removalListener(new RemovalListener [String , Array [Byte ]]() {
42+ override def onRemoval (
43+ notification : RemovalNotification [String , Array [Byte ]]): Unit = {
44+ logInfo(" Remove class : " + notification.getKey)
45+ }
46+ })
47+ .build
48+ .asInstanceOf [Cache [String , Array [Byte ]]]
49+
3850
3951 // Get an ASM class reader for a given class from the JAR that loaded it
4052 private [util] def getClassReader (cls : Class [_]): ClassReader = {
4153 // Copy data over, before delegating to ClassReader - else we can run out of open file handles.
4254 val className = cls.getName.replaceFirst(" ^.*\\ ." , " " ) + " .class"
43- val resourceStream = cls.getResourceAsStream (className)
44- if (resourceStream = = null ) {
45- null
55+ val bytes = classBytesCache.getIfPresent (className)
56+ if (bytes ! = null ) {
57+ new ClassReader ( new ByteArrayInputStream (bytes.clone()))
4658 } else {
47- val baos = new ByteArrayOutputStream (128 )
48- Utils .copyStream(resourceStream, baos, true )
49- new ClassReader (new ByteArrayInputStream (baos.toByteArray))
59+ val resourceStream = cls.getResourceAsStream(className)
60+ if (resourceStream == null ) {
61+ null
62+ } else {
63+ val baos = new ByteArrayOutputStream (128 )
64+ Utils .copyStream(resourceStream, baos, true )
65+ val array = baos.toByteArray
66+ classBytesCache.put(className, array)
67+ new ClassReader (new ByteArrayInputStream (array.clone()))
68+ }
5069 }
5170 }
5271
0 commit comments