Skip to content

SPARK-3470 [CORE] [STREAMING] Add Closeable / close() to Java context objects #2346

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.api.java

import java.io.Closeable
import java.util
import java.util.{Map => JMap}

Expand All @@ -40,7 +41,9 @@ import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD}
* A Java-friendly version of [[org.apache.spark.SparkContext]] that returns
* [[org.apache.spark.api.java.JavaRDD]]s and works with Java collections instead of Scala ones.
*/
class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWorkaround {
class JavaSparkContext(val sc: SparkContext)
extends JavaSparkContextVarargsWorkaround with Closeable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note two space indent here, but don't worry i will fix it.


/**
* Create a JavaSparkContext that loads settings from system properties (for instance, when
* launching with ./bin/spark-submit).
Expand Down Expand Up @@ -534,6 +537,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
sc.stop()
}

override def close(): Unit = stop()

/**
* Get Spark's home location from either a value set through the constructor,
* or the spark.home Java property, or the SPARK_HOME environment variable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.spark.streaming.api.java
import scala.collection.JavaConversions._
import scala.reflect.ClassTag

import java.io.InputStream
import java.io.{Closeable, InputStream}
import java.util.{List => JList, Map => JMap}

import akka.actor.{Props, SupervisorStrategy}
Expand Down Expand Up @@ -49,7 +49,7 @@ import org.apache.spark.streaming.receiver.Receiver
* respectively. `context.awaitTransformation()` allows the current thread to wait for the
* termination of a context by `stop()` or by an exception.
*/
class JavaStreamingContext(val ssc: StreamingContext) {
class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {

/**
* Create a StreamingContext.
Expand Down Expand Up @@ -540,6 +540,9 @@ class JavaStreamingContext(val ssc: StreamingContext) {
def stop(stopSparkContext: Boolean, stopGracefully: Boolean) = {
ssc.stop(stopSparkContext, stopGracefully)
}

override def close(): Unit = stop()

}

/**
Expand Down