@@ -41,63 +41,6 @@ internal class SparkStreamingIntegration : Integration() {
41
41
42
42
@Language(" kts" )
43
43
val _1 = listOf (
44
- // For when onInterrupt is implemented in the Jupyter kernel
45
- // """
46
- // val sscCollection = mutableSetOf<JavaStreamingContext>()
47
- // """.trimIndent(),
48
- // """
49
- // @JvmOverloads
50
- // fun withSparkStreaming(
51
- // batchDuration: Duration = Durations.seconds(1L),
52
- // checkpointPath: String? = null,
53
- // hadoopConf: Configuration = SparkHadoopUtil.get().conf(),
54
- // createOnError: Boolean = false,
55
- // props: Map<String, Any> = emptyMap(),
56
- // master: String = SparkConf().get("spark.master", "local[*]"),
57
- // appName: String = "Kotlin Spark Sample",
58
- // timeout: Long = -1L,
59
- // startStreamingContext: Boolean = true,
60
- // func: KSparkStreamingSession.() -> Unit,
61
- // ) {
62
- //
63
- // // will only be set when a new context is created
64
- // var kSparkStreamingSession: KSparkStreamingSession? = null
65
- //
66
- // val creatingFunc = {
67
- // val sc = SparkConf()
68
- // .setAppName(appName)
69
- // .setMaster(master)
70
- // .setAll(
71
- // props
72
- // .map { (key, value) -> key X value.toString() }
73
- // .asScalaIterable()
74
- // )
75
- //
76
- // val ssc = JavaStreamingContext(sc, batchDuration)
77
- // ssc.checkpoint(checkpointPath)
78
- //
79
- // kSparkStreamingSession = KSparkStreamingSession(ssc)
80
- // func(kSparkStreamingSession!!)
81
- //
82
- // ssc
83
- // }
84
- //
85
- // val ssc = when {
86
- // checkpointPath != null ->
87
- // JavaStreamingContext.getOrCreate(checkpointPath, creatingFunc, hadoopConf, createOnError)
88
- //
89
- // else -> creatingFunc()
90
- // }
91
- // sscCollection += ssc
92
- //
93
- // if (startStreamingContext) {
94
- // ssc.start()
95
- // kSparkStreamingSession?.invokeRunAfterStart()
96
- // }
97
- // ssc.awaitTerminationOrTimeout(timeout)
98
- // ssc.stop()
99
- // }
100
- // """.trimIndent(),
101
44
"""
102
45
println("To start a spark streaming session, simply use `withSparkStreaming { }` inside a cell. To use Spark normally, use `withSpark { }` in a cell, or use `%use spark` to start a Spark session for the whole notebook.")""" .trimIndent(),
103
46
).map(::execute)
@@ -106,22 +49,4 @@ internal class SparkStreamingIntegration : Integration() {
106
49
override fun KotlinKernelHost.onShutdown () = Unit
107
50
108
51
override fun KotlinKernelHost.afterCellExecution (snippetInstance : Any , result : FieldValue ) = Unit
109
-
110
- // For when this feature is implemented in the Jupyter kernel
111
- // override fun KotlinKernelHost.onInterrupt() {
112
- //
113
- // @Language("kts")
114
- // val _1 = listOf(
115
- // """
116
- // while (sscCollection.isNotEmpty())
117
- // sscCollection.first().let {
118
- // it.stop()
119
- // sscCollection.remove(it)
120
- // }
121
- // """.trimIndent(),
122
- // """
123
- // println("onInterrupt cleanup!")
124
- // """.trimIndent()
125
- // ).map(::execute)
126
- // }
127
52
}
0 commit comments