Skip to content

Commit 48e2590

Browse files
mccheahbulldozer-bot[bot]
authored andcommitted
Data Source V2 Updates (#553)
This commit cherry-picks all the Data Source V2 related changes from upstream Spark, along with some dependent patches.
1 parent a3f2cf6 commit 48e2590

File tree

169 files changed

+8663
-1813
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

169 files changed

+8663
-1813
lines changed

docs/_data/menu-sql.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@
7070
url: sql-migration-guide-upgrade.html
7171
- text: Compatibility with Apache Hive
7272
url: sql-migration-guide-hive-compatibility.html
73+
- text: SQL Reserved/Non-Reserved Keywords
74+
url: sql-reserved-and-non-reserved-keywords.html
7375
- text: Reference
7476
url: sql-reference.html
7577
subitems:

docs/sql-reserved-and-non-reserved-keywords.md

Lines changed: 575 additions & 0 deletions
Large diffs are not rendered by default.

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,7 @@ import org.apache.spark.sql.sources.v2.reader.streaming._
3737
* @param offsetReader a reader used to get kafka offsets. Note that the actual data will be
3838
* read by per-task consumers generated later.
3939
* @param kafkaParams String params for per-task Kafka consumers.
40-
* @param sourceOptions The [[org.apache.spark.sql.sources.v2.DataSourceOptions]] params which
41-
* are not Kafka consumer params.
40+
* @param sourceOptions Params which are not Kafka consumer params.
4241
* @param metadataPath Path to a directory this reader can use for writing metadata.
4342
* @param initialOffsets The Kafka offsets to start reading data at.
4443
* @param failOnDataLoss Flag indicating whether reading should fail in data loss

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
3333
import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
3434
import org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchStream
3535
import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
36-
import org.apache.spark.sql.sources.v2.DataSourceOptions
3736
import org.apache.spark.sql.sources.v2.reader._
3837
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset}
38+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
3939
import org.apache.spark.util.UninterruptibleThread
4040

4141
/**
@@ -57,7 +57,7 @@ import org.apache.spark.util.UninterruptibleThread
5757
private[kafka010] class KafkaMicroBatchStream(
5858
kafkaOffsetReader: KafkaOffsetReader,
5959
executorKafkaParams: ju.Map[String, Object],
60-
options: DataSourceOptions,
60+
options: CaseInsensitiveStringMap,
6161
metadataPath: String,
6262
startingOffsets: KafkaOffsetRangeLimit,
6363
failOnDataLoss: Boolean) extends RateControlMicroBatchStream with Logging {
@@ -66,8 +66,7 @@ private[kafka010] class KafkaMicroBatchStream(
6666
"kafkaConsumer.pollTimeoutMs",
6767
SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L)
6868

69-
private val maxOffsetsPerTrigger =
70-
Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
69+
private val maxOffsetsPerTrigger = Option(options.get("maxOffsetsPerTrigger")).map(_.toLong)
7170

7271
private val rangeCalculator = KafkaOffsetRangeCalculator(options)
7372

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.kafka010
1919

2020
import org.apache.kafka.common.TopicPartition
2121

22-
import org.apache.spark.sql.sources.v2.DataSourceOptions
22+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
2323

2424

2525
/**
@@ -91,8 +91,8 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int
9191

9292
private[kafka010] object KafkaOffsetRangeCalculator {
9393

94-
def apply(options: DataSourceOptions): KafkaOffsetRangeCalculator = {
95-
val optionalValue = Option(options.get("minPartitions").orElse(null)).map(_.toInt)
94+
def apply(options: CaseInsensitiveStringMap): KafkaOffsetRangeCalculator = {
95+
val optionalValue = Option(options.get("minPartitions")).map(_.toInt)
9696
new KafkaOffsetRangeCalculator(optionalValue)
9797
}
9898
}

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.kafka010
1919

2020
import java.{util => ju}
21-
import java.util.{Locale, UUID}
21+
import java.util.{Collections, Locale, UUID}
2222

2323
import scala.collection.JavaConverters._
2424

@@ -33,9 +33,11 @@ import org.apache.spark.sql.sources._
3333
import org.apache.spark.sql.sources.v2._
3434
import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
3535
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream}
36-
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
36+
import org.apache.spark.sql.sources.v2.writer.WriteBuilder
37+
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
3738
import org.apache.spark.sql.streaming.OutputMode
3839
import org.apache.spark.sql.types.StructType
40+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
3941

4042
/**
4143
* The provider class for all Kafka readers and writers. It is designed such that it throws
@@ -47,7 +49,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
4749
with StreamSinkProvider
4850
with RelationProvider
4951
with CreatableRelationProvider
50-
with StreamingWriteSupportProvider
5152
with TableProvider
5253
with Logging {
5354
import KafkaSourceProvider._
@@ -102,8 +103,8 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
102103
failOnDataLoss(caseInsensitiveParams))
103104
}
104105

105-
override def getTable(options: DataSourceOptions): KafkaTable = {
106-
new KafkaTable(strategy(options.asMap().asScala.toMap))
106+
override def getTable(options: CaseInsensitiveStringMap): KafkaTable = {
107+
new KafkaTable(strategy(options.asScala.toMap))
107108
}
108109

109110
/**
@@ -180,20 +181,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
180181
}
181182
}
182183

183-
override def createStreamingWriteSupport(
184-
queryId: String,
185-
schema: StructType,
186-
mode: OutputMode,
187-
options: DataSourceOptions): StreamingWriteSupport = {
188-
import scala.collection.JavaConverters._
189-
190-
val topic = Option(options.get(TOPIC_OPTION_KEY).orElse(null)).map(_.trim)
191-
// We convert the options argument from V2 -> Java map -> scala mutable -> scala immutable.
192-
val producerParams = kafkaParamsForProducer(options.asMap.asScala.toMap)
193-
194-
new KafkaStreamingWriteSupport(topic, producerParams, schema)
195-
}
196-
197184
private def strategy(caseInsensitiveParams: Map[String, String]) =
198185
caseInsensitiveParams.find(x => STRATEGY_OPTION_KEYS.contains(x._1)).get match {
199186
case ("assign", value) =>
@@ -365,23 +352,45 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
365352
}
366353

367354
class KafkaTable(strategy: => ConsumerStrategy) extends Table
368-
with SupportsMicroBatchRead with SupportsContinuousRead {
355+
with SupportsMicroBatchRead with SupportsContinuousRead with SupportsStreamingWrite {
369356

370357
override def name(): String = s"Kafka $strategy"
371358

372359
override def schema(): StructType = KafkaOffsetReader.kafkaSchema
373360

374-
override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new ScanBuilder {
361+
override def capabilities(): ju.Set[TableCapability] = Collections.emptySet()
362+
363+
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new ScanBuilder {
375364
override def build(): Scan = new KafkaScan(options)
376365
}
366+
367+
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
368+
new WriteBuilder {
369+
private var inputSchema: StructType = _
370+
371+
override def withInputDataSchema(schema: StructType): WriteBuilder = {
372+
this.inputSchema = schema
373+
this
374+
}
375+
376+
override def buildForStreaming(): StreamingWrite = {
377+
import scala.collection.JavaConverters._
378+
379+
assert(inputSchema != null)
380+
val topic = Option(options.get(TOPIC_OPTION_KEY)).map(_.trim)
381+
val producerParams = kafkaParamsForProducer(options.asScala.toMap)
382+
new KafkaStreamingWrite(topic, producerParams, inputSchema)
383+
}
384+
}
385+
}
377386
}
378387

379-
class KafkaScan(options: DataSourceOptions) extends Scan {
388+
class KafkaScan(options: CaseInsensitiveStringMap) extends Scan {
380389

381390
override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema
382391

383392
override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = {
384-
val parameters = options.asMap().asScala.toMap
393+
val parameters = options.asScala.toMap
385394
validateStreamOptions(parameters)
386395
// Each running query should use its own group id. Otherwise, the query may be only assigned
387396
// partial data since Kafka will assign partitions to multiple consumers having the same group
@@ -410,7 +419,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
410419
}
411420

412421
override def toContinuousStream(checkpointLocation: String): ContinuousStream = {
413-
val parameters = options.asMap().asScala.toMap
422+
val parameters = options.asScala.toMap
414423
validateStreamOptions(parameters)
415424
// Each running query should use its own group id. Otherwise, the query may be only assigned
416425
// partial data since Kafka will assign partitions to multiple consumers having the same group
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
2323
import org.apache.spark.sql.catalyst.expressions.Attribute
2424
import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery
2525
import org.apache.spark.sql.sources.v2.writer._
26-
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport}
26+
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
2727
import org.apache.spark.sql.types.StructType
2828

2929
/**
@@ -33,18 +33,18 @@ import org.apache.spark.sql.types.StructType
3333
case object KafkaWriterCommitMessage extends WriterCommitMessage
3434

3535
/**
36-
* A [[StreamingWriteSupport]] for Kafka writing. Responsible for generating the writer factory.
36+
* A [[StreamingWrite]] for Kafka writing. Responsible for generating the writer factory.
3737
*
3838
* @param topic The topic this writer is responsible for. If None, topic will be inferred from
3939
* a `topic` field in the incoming data.
4040
* @param producerParams Parameters for Kafka producers in each task.
4141
* @param schema The schema of the input data.
4242
*/
43-
class KafkaStreamingWriteSupport(
43+
class KafkaStreamingWrite(
4444
topic: Option[String],
4545
producerParams: ju.Map[String, Object],
4646
schema: StructType)
47-
extends StreamingWriteSupport {
47+
extends StreamingWrite {
4848

4949
validateQuery(schema.toAttributes, producerParams, topic)
5050

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala

Lines changed: 24 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,8 @@ import java.util.Locale
2222
import org.apache.kafka.clients.producer.ProducerConfig
2323
import org.apache.kafka.common.serialization.ByteArraySerializer
2424
import org.scalatest.time.SpanSugar._
25-
import scala.collection.JavaConverters._
2625

27-
import org.apache.spark.sql.{DataFrame, Row}
26+
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
2827
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SpecificInternalRow, UnsafeProjection}
2928
import org.apache.spark.sql.streaming._
3029
import org.apache.spark.sql.types.{BinaryType, DataType}
@@ -227,39 +226,23 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
227226
val topic = newTopic()
228227
testUtils.createTopic(topic)
229228

230-
/* No topic field or topic option */
231-
var writer: StreamingQuery = null
232-
var ex: Exception = null
233-
try {
234-
writer = createKafkaWriter(input.toDF())(
229+
val ex = intercept[AnalysisException] {
230+
/* No topic field or topic option */
231+
createKafkaWriter(input.toDF())(
235232
withSelectExpr = "value as key", "value"
236233
)
237-
testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
238-
eventually(timeout(streamingTimeout)) {
239-
assert(writer.exception.isDefined)
240-
ex = writer.exception.get
241-
}
242-
} finally {
243-
writer.stop()
244234
}
245235
assert(ex.getMessage
246236
.toLowerCase(Locale.ROOT)
247237
.contains("topic option required when no 'topic' attribute is present"))
248238

249-
try {
239+
val ex2 = intercept[AnalysisException] {
250240
/* No value field */
251-
writer = createKafkaWriter(input.toDF())(
241+
createKafkaWriter(input.toDF())(
252242
withSelectExpr = s"'$topic' as topic", "value as key"
253243
)
254-
testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
255-
eventually(timeout(streamingTimeout)) {
256-
assert(writer.exception.isDefined)
257-
ex = writer.exception.get
258-
}
259-
} finally {
260-
writer.stop()
261244
}
262-
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
245+
assert(ex2.getMessage.toLowerCase(Locale.ROOT).contains(
263246
"required attribute 'value' not found"))
264247
}
265248

@@ -278,53 +261,30 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
278261
val topic = newTopic()
279262
testUtils.createTopic(topic)
280263

281-
var writer: StreamingQuery = null
282-
var ex: Exception = null
283-
try {
264+
val ex = intercept[AnalysisException] {
284265
/* topic field wrong type */
285-
writer = createKafkaWriter(input.toDF())(
266+
createKafkaWriter(input.toDF())(
286267
withSelectExpr = s"CAST('1' as INT) as topic", "value"
287268
)
288-
testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
289-
eventually(timeout(streamingTimeout)) {
290-
assert(writer.exception.isDefined)
291-
ex = writer.exception.get
292-
}
293-
} finally {
294-
writer.stop()
295269
}
296270
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("topic type must be a string"))
297271

298-
try {
272+
val ex2 = intercept[AnalysisException] {
299273
/* value field wrong type */
300-
writer = createKafkaWriter(input.toDF())(
274+
createKafkaWriter(input.toDF())(
301275
withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as value"
302276
)
303-
testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
304-
eventually(timeout(streamingTimeout)) {
305-
assert(writer.exception.isDefined)
306-
ex = writer.exception.get
307-
}
308-
} finally {
309-
writer.stop()
310277
}
311-
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
278+
assert(ex2.getMessage.toLowerCase(Locale.ROOT).contains(
312279
"value attribute type must be a string or binary"))
313280

314-
try {
281+
val ex3 = intercept[AnalysisException] {
315282
/* key field wrong type */
316-
writer = createKafkaWriter(input.toDF())(
283+
createKafkaWriter(input.toDF())(
317284
withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as key", "value"
318285
)
319-
testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
320-
eventually(timeout(streamingTimeout)) {
321-
assert(writer.exception.isDefined)
322-
ex = writer.exception.get
323-
}
324-
} finally {
325-
writer.stop()
326286
}
327-
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
287+
assert(ex3.getMessage.toLowerCase(Locale.ROOT).contains(
328288
"key attribute type must be a string or binary"))
329289
}
330290

@@ -369,35 +329,22 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
369329
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
370330
.option("subscribe", inputTopic)
371331
.load()
372-
var writer: StreamingQuery = null
373-
var ex: Exception = null
374-
try {
375-
writer = createKafkaWriter(
332+
333+
val ex = intercept[IllegalArgumentException] {
334+
createKafkaWriter(
376335
input.toDF(),
377336
withOptions = Map("kafka.key.serializer" -> "foo"))()
378-
eventually(timeout(streamingTimeout)) {
379-
assert(writer.exception.isDefined)
380-
ex = writer.exception.get
381-
}
382-
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
383-
"kafka option 'key.serializer' is not supported"))
384-
} finally {
385-
writer.stop()
386337
}
338+
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
339+
"kafka option 'key.serializer' is not supported"))
387340

388-
try {
389-
writer = createKafkaWriter(
341+
val ex2 = intercept[IllegalArgumentException] {
342+
createKafkaWriter(
390343
input.toDF(),
391344
withOptions = Map("kafka.value.serializer" -> "foo"))()
392-
eventually(timeout(streamingTimeout)) {
393-
assert(writer.exception.isDefined)
394-
ex = writer.exception.get
395-
}
396-
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
397-
"kafka option 'value.serializer' is not supported"))
398-
} finally {
399-
writer.stop()
400345
}
346+
assert(ex2.getMessage.toLowerCase(Locale.ROOT).contains(
347+
"kafka option 'value.serializer' is not supported"))
401348
}
402349

403350
test("generic - write big data with small producer buffer") {

0 commit comments

Comments
 (0)