Skip to content

Commit bf34d66

Browse files
committed
[SPARK-23144][SS] Added console sink for continuous processing
## What changes were proposed in this pull request? Refactored ConsoleWriter into ConsoleMicrobatchWriter and ConsoleContinuousWriter. ## How was this patch tested? new unit test Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #20311 from tdas/SPARK-23144.
1 parent 2d41f04 commit bf34d66

File tree

3 files changed

+96
-30
lines changed

3 files changed

+96
-30
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,12 @@ package org.apache.spark.sql.execution.streaming
1919

2020
import java.util.Optional
2121

22-
import scala.collection.JavaConverters._
23-
2422
import org.apache.spark.sql._
25-
import org.apache.spark.sql.execution.streaming.sources.ConsoleWriter
23+
import org.apache.spark.sql.execution.streaming.sources.{ConsoleContinuousWriter, ConsoleMicroBatchWriter, ConsoleWriter}
2624
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister}
2725
import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
28-
import org.apache.spark.sql.sources.v2.streaming.MicroBatchWriteSupport
26+
import org.apache.spark.sql.sources.v2.streaming.{ContinuousWriteSupport, MicroBatchWriteSupport}
27+
import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
2928
import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer
3029
import org.apache.spark.sql.streaming.OutputMode
3130
import org.apache.spark.sql.types.StructType
@@ -37,16 +36,25 @@ case class ConsoleRelation(override val sqlContext: SQLContext, data: DataFrame)
3736

3837
class ConsoleSinkProvider extends DataSourceV2
3938
with MicroBatchWriteSupport
39+
with ContinuousWriteSupport
4040
with DataSourceRegister
4141
with CreatableRelationProvider {
4242

4343
override def createMicroBatchWriter(
4444
queryId: String,
45-
epochId: Long,
45+
batchId: Long,
4646
schema: StructType,
4747
mode: OutputMode,
4848
options: DataSourceV2Options): Optional[DataSourceV2Writer] = {
49-
Optional.of(new ConsoleWriter(epochId, schema, options))
49+
Optional.of(new ConsoleMicroBatchWriter(batchId, schema, options))
50+
}
51+
52+
override def createContinuousWriter(
53+
queryId: String,
54+
schema: StructType,
55+
mode: OutputMode,
56+
options: DataSourceV2Options): Optional[ContinuousWriter] = {
57+
Optional.of(new ConsoleContinuousWriter(schema, options))
5058
}
5159

5260
def createRelation(

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala

Lines changed: 60 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,45 +20,85 @@ package org.apache.spark.sql.execution.streaming.sources
2020
import org.apache.spark.internal.Logging
2121
import org.apache.spark.sql.{Row, SparkSession}
2222
import org.apache.spark.sql.sources.v2.DataSourceV2Options
23+
import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
2324
import org.apache.spark.sql.sources.v2.writer.{DataSourceV2Writer, DataWriterFactory, WriterCommitMessage}
2425
import org.apache.spark.sql.types.StructType
2526

26-
/**
27-
* A [[DataSourceV2Writer]] that collects results to the driver and prints them in the console.
28-
* Generated by [[org.apache.spark.sql.execution.streaming.ConsoleSinkProvider]].
29-
*
30-
* This sink should not be used for production, as it requires sending all rows to the driver
31-
* and does not support recovery.
32-
*/
33-
class ConsoleWriter(batchId: Long, schema: StructType, options: DataSourceV2Options)
34-
extends DataSourceV2Writer with Logging {
27+
/** Common methods used to create writes for the the console sink */
28+
trait ConsoleWriter extends Logging {
29+
30+
def options: DataSourceV2Options
31+
3532
// Number of rows to display, by default 20 rows
36-
private val numRowsToShow = options.getInt("numRows", 20)
33+
protected val numRowsToShow = options.getInt("numRows", 20)
3734

3835
// Truncate the displayed data if it is too long, by default it is true
39-
private val isTruncated = options.getBoolean("truncate", true)
36+
protected val isTruncated = options.getBoolean("truncate", true)
4037

4138
assert(SparkSession.getActiveSession.isDefined)
42-
private val spark = SparkSession.getActiveSession.get
39+
protected val spark = SparkSession.getActiveSession.get
40+
41+
def createWriterFactory(): DataWriterFactory[Row] = PackedRowWriterFactory
4342

44-
override def createWriterFactory(): DataWriterFactory[Row] = PackedRowWriterFactory
43+
def abort(messages: Array[WriterCommitMessage]): Unit = {}
4544

46-
override def commit(messages: Array[WriterCommitMessage]): Unit = synchronized {
47-
val batch = messages.collect {
45+
protected def printRows(
46+
commitMessages: Array[WriterCommitMessage],
47+
schema: StructType,
48+
printMessage: String): Unit = {
49+
val rows = commitMessages.collect {
4850
case PackedRowCommitMessage(rows) => rows
4951
}.flatten
5052

5153
// scalastyle:off println
5254
println("-------------------------------------------")
53-
println(s"Batch: $batchId")
55+
println(printMessage)
5456
println("-------------------------------------------")
5557
// scalastyle:off println
56-
spark.createDataFrame(
57-
spark.sparkContext.parallelize(batch), schema)
58+
spark
59+
.createDataFrame(spark.sparkContext.parallelize(rows), schema)
5860
.show(numRowsToShow, isTruncated)
5961
}
62+
}
63+
64+
65+
/**
66+
* A [[DataSourceV2Writer]] that collects results from a micro-batch query to the driver and
67+
* prints them in the console. Created by
68+
* [[org.apache.spark.sql.execution.streaming.ConsoleSinkProvider]].
69+
*
70+
* This sink should not be used for production, as it requires sending all rows to the driver
71+
* and does not support recovery.
72+
*/
73+
class ConsoleMicroBatchWriter(batchId: Long, schema: StructType, val options: DataSourceV2Options)
74+
extends DataSourceV2Writer with ConsoleWriter {
75+
76+
override def commit(messages: Array[WriterCommitMessage]): Unit = {
77+
printRows(messages, schema, s"Batch: $batchId")
78+
}
79+
80+
override def toString(): String = {
81+
s"ConsoleMicroBatchWriter[numRows=$numRowsToShow, truncate=$isTruncated]"
82+
}
83+
}
6084

61-
override def abort(messages: Array[WriterCommitMessage]): Unit = {}
6285

63-
override def toString(): String = s"ConsoleWriter[numRows=$numRowsToShow, truncate=$isTruncated]"
86+
/**
87+
* A [[DataSourceV2Writer]] that collects results from a continuous query to the driver and
88+
* prints them in the console. Created by
89+
* [[org.apache.spark.sql.execution.streaming.ConsoleSinkProvider]].
90+
*
91+
* This sink should not be used for production, as it requires sending all rows to the driver
92+
* and does not support recovery.
93+
*/
94+
class ConsoleContinuousWriter(schema: StructType, val options: DataSourceV2Options)
95+
extends ContinuousWriter with ConsoleWriter {
96+
97+
override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
98+
printRows(messages, schema, s"Continuous processing epoch $epochId")
99+
}
100+
101+
override def toString(): String = {
102+
s"ConsoleContinuousWriter[numRows=$numRowsToShow, truncate=$isTruncated]"
103+
}
64104
}

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriterSuite.scala

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@ package org.apache.spark.sql.execution.streaming.sources
1919

2020
import java.io.ByteArrayOutputStream
2121

22+
import org.scalatest.time.SpanSugar._
23+
2224
import org.apache.spark.sql.execution.streaming.MemoryStream
23-
import org.apache.spark.sql.streaming.StreamTest
25+
import org.apache.spark.sql.streaming.{StreamTest, Trigger}
2426

2527
class ConsoleWriterSuite extends StreamTest {
2628
import testImplicits._
2729

28-
test("console") {
30+
test("microbatch - default") {
2931
val input = MemoryStream[Int]
3032

3133
val captured = new ByteArrayOutputStream()
@@ -77,7 +79,7 @@ class ConsoleWriterSuite extends StreamTest {
7779
|""".stripMargin)
7880
}
7981

80-
test("console with numRows") {
82+
test("microbatch - with numRows") {
8183
val input = MemoryStream[Int]
8284

8385
val captured = new ByteArrayOutputStream()
@@ -106,7 +108,7 @@ class ConsoleWriterSuite extends StreamTest {
106108
|""".stripMargin)
107109
}
108110

109-
test("console with truncation") {
111+
test("microbatch - truncation") {
110112
val input = MemoryStream[String]
111113

112114
val captured = new ByteArrayOutputStream()
@@ -132,4 +134,20 @@ class ConsoleWriterSuite extends StreamTest {
132134
|
133135
|""".stripMargin)
134136
}
137+
138+
test("continuous - default") {
139+
val captured = new ByteArrayOutputStream()
140+
Console.withOut(captured) {
141+
val input = spark.readStream
142+
.format("rate")
143+
.option("numPartitions", "1")
144+
.option("rowsPerSecond", "5")
145+
.load()
146+
.select('value)
147+
148+
val query = input.writeStream.format("console").trigger(Trigger.Continuous(200)).start()
149+
assert(query.isActive)
150+
query.stop()
151+
}
152+
}
135153
}

0 commit comments

Comments
 (0)