Skip to content

Commit 604d679

Browse files
edrevocloud-fan
authored andcommitted
[SPARK-30226][SQL] Remove withXXX functions in WriteBuilder
### What changes were proposed in this pull request? Adding a `LogicalWriteInfo` interface as suggested by cloud-fan in #25990 (comment) ### Why are the changes needed? It provides compile-time guarantees where we previously had none, which will make it harder to introduce bugs in the future. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Compiles and passes tests Closes #26678 from edrevo/add-logical-write-info. Lead-authored-by: Ximo Guanter <joaquin.guantergonzalbez@telefonica.com> Co-authored-by: Ximo Guanter Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 3eade74 commit 604d679

File tree

34 files changed

+207
-179
lines changed

34 files changed

+207
-179
lines changed

external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileStatus
2222

2323
import org.apache.spark.sql.SparkSession
2424
import org.apache.spark.sql.avro.AvroUtils
25-
import org.apache.spark.sql.connector.write.WriteBuilder
25+
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
2626
import org.apache.spark.sql.execution.datasources.FileFormat
2727
import org.apache.spark.sql.execution.datasources.v2.FileTable
2828
import org.apache.spark.sql.types.{DataType, StructType}
@@ -42,8 +42,8 @@ case class AvroTable(
4242
override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
4343
AvroUtils.inferSchema(sparkSession, options.asScala.toMap, files)
4444

45-
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder =
46-
new AvroWriteBuilder(options, paths, formatName, supportsDataType)
45+
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
46+
new AvroWriteBuilder(paths, formatName, supportsDataType, info)
4747

4848
override def supportsDataType(dataType: DataType): Boolean = AvroUtils.supportsDataType(dataType)
4949

external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroWriteBuilder.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,18 @@ package org.apache.spark.sql.v2.avro
1919
import org.apache.hadoop.mapreduce.Job
2020

2121
import org.apache.spark.sql.avro.AvroUtils
22+
import org.apache.spark.sql.connector.write.LogicalWriteInfo
2223
import org.apache.spark.sql.execution.datasources.OutputWriterFactory
2324
import org.apache.spark.sql.execution.datasources.v2.FileWriteBuilder
2425
import org.apache.spark.sql.internal.SQLConf
2526
import org.apache.spark.sql.types._
26-
import org.apache.spark.sql.util.CaseInsensitiveStringMap
2727

2828
class AvroWriteBuilder(
29-
options: CaseInsensitiveStringMap,
3029
paths: Seq[String],
3130
formatName: String,
32-
supportsDataType: DataType => Boolean)
33-
extends FileWriteBuilder(options, paths, formatName, supportsDataType) {
31+
supportsDataType: DataType => Boolean,
32+
info: LogicalWriteInfo)
33+
extends FileWriteBuilder(paths, formatName, supportsDataType, info) {
3434
override def prepareWrite(
3535
sqlConf: SQLConf,
3636
job: Job,

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
3333
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability, TableProvider}
3434
import org.apache.spark.sql.connector.read.{Batch, Scan, ScanBuilder}
3535
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
36-
import org.apache.spark.sql.connector.write.{BatchWrite, WriteBuilder}
36+
import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, WriteBuilder}
3737
import org.apache.spark.sql.connector.write.streaming.StreamingWrite
3838
import org.apache.spark.sql.execution.streaming.{Sink, Source}
3939
import org.apache.spark.sql.sources._
@@ -392,18 +392,14 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
392392
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder =
393393
() => new KafkaScan(options)
394394

395-
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
395+
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
396396
new WriteBuilder {
397-
private var inputSchema: StructType = _
397+
private val options = info.options
398+
private val inputSchema: StructType = info.schema()
398399
private val topic = Option(options.get(TOPIC_OPTION_KEY)).map(_.trim)
399400
private val producerParams =
400401
kafkaParamsForProducer(CaseInsensitiveMap(options.asScala.toMap))
401402

402-
override def withInputDataSchema(schema: StructType): WriteBuilder = {
403-
this.inputSchema = schema
404-
this
405-
}
406-
407403
override def buildForBatch(): BatchWrite = {
408404
assert(inputSchema != null)
409405
new KafkaBatchWrite(topic, producerParams, inputSchema)

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagedTable.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121

2222
import org.apache.spark.annotation.Experimental;
2323
import org.apache.spark.sql.connector.expressions.Transform;
24+
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
2425
import org.apache.spark.sql.types.StructType;
25-
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
2626

2727
/**
2828
* Represents a table which is staged for being committed to the metastore.
@@ -32,10 +32,10 @@
3232
* {@link StagingTableCatalog#stageCreate(Identifier, StructType, Transform[], Map)} or
3333
* {@link StagingTableCatalog#stageReplace(Identifier, StructType, Transform[], Map)} to prepare the
3434
* table for being written to. This table should usually implement {@link SupportsWrite}. A new
35-
* writer will be constructed via {@link SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap)},
36-
* and the write will be committed. The job concludes with a call to {@link #commitStagedChanges()},
37-
* at which point implementations are expected to commit the table's metadata into the metastore
38-
* along with the data that was written by the writes from the write builder this table created.
35+
* writer will be constructed via {@link SupportsWrite#newWriteBuilder(LogicalWriteInfo)}, and the
36+
* write will be committed. The job concludes with a call to {@link #commitStagedChanges()}, at
37+
* which point implementations are expected to commit the table's metadata into the metastore along
38+
* with the data that was written by the writes from the write builder this table created.
3939
*/
4040
@Experimental
4141
public interface StagedTable extends Table {

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@
2121

2222
import org.apache.spark.annotation.Experimental;
2323
import org.apache.spark.sql.connector.expressions.Transform;
24+
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
2425
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
2526
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
2627
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
2728
import org.apache.spark.sql.connector.write.BatchWrite;
2829
import org.apache.spark.sql.connector.write.WriterCommitMessage;
2930
import org.apache.spark.sql.types.StructType;
30-
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
3131

3232
/**
3333
* An optional mix-in for implementations of {@link TableCatalog} that support staging creation of
@@ -39,9 +39,9 @@
3939
* TABLE AS SELECT operation, if the catalog does not implement this trait, the planner will first
4040
* drop the table via {@link TableCatalog#dropTable(Identifier)}, then create the table via
4141
* {@link TableCatalog#createTable(Identifier, StructType, Transform[], Map)}, and then perform
42-
* the write via {@link SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap)}. However, if the
43-
* write operation fails, the catalog will have already dropped the table, and the planner cannot
44-
* roll back the dropping of the table.
42+
* the write via {@link SupportsWrite#newWriteBuilder(LogicalWriteInfo)}.
43+
* However, if the write operation fails, the catalog will have already dropped the table, and the
44+
* planner cannot roll back the dropping of the table.
4545
* <p>
4646
* If the catalog implements this plugin, the catalog can implement the methods to "stage" the
4747
* creation and the replacement of a table. After the table's

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsWrite.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@
1919

2020
import org.apache.spark.annotation.Experimental;
2121
import org.apache.spark.sql.connector.write.BatchWrite;
22+
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
2223
import org.apache.spark.sql.connector.write.WriteBuilder;
23-
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
2424

2525
/**
2626
* A mix-in interface of {@link Table}, to indicate that it's writable. This adds
27-
* {@link #newWriteBuilder(CaseInsensitiveStringMap)} that is used to create a write
28-
* for batch or streaming.
27+
* {@link #newWriteBuilder(LogicalWriteInfo)} that is used to create a
28+
* write for batch or streaming.
2929
*/
3030
@Experimental
3131
public interface SupportsWrite extends Table {
@@ -34,5 +34,5 @@ public interface SupportsWrite extends Table {
3434
* Returns a {@link WriteBuilder} which can be used to create {@link BatchWrite}. Spark will call
3535
* this method to configure each data source write.
3636
*/
37-
WriteBuilder newWriteBuilder(CaseInsensitiveStringMap options);
37+
WriteBuilder newWriteBuilder(LogicalWriteInfo info);
3838
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.write;
19+
20+
import org.apache.spark.annotation.Evolving;
21+
import org.apache.spark.sql.types.StructType;
22+
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
23+
24+
/**
25+
* This interface contains logical write information that data sources can use when generating a
26+
* {@link WriteBuilder}.
27+
*/
28+
@Evolving
29+
public interface LogicalWriteInfo {
30+
/**
31+
* the options that the user specified when writing the dataset
32+
*/
33+
CaseInsensitiveStringMap options();
34+
35+
/**
36+
* `queryId` is a unique string of the query. It's possible that there are many queries
37+
* running at the same time, or a query is restarted and resumed. {@link BatchWrite} can use
38+
* this id to identify the query.
39+
*/
40+
String queryId();
41+
42+
/**
43+
* the schema of the input data from Spark to data source.
44+
*/
45+
StructType schema();
46+
}

sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/WriteBuilder.java

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.spark.sql.connector.catalog.Table;
2222
import org.apache.spark.sql.connector.catalog.TableCapability;
2323
import org.apache.spark.sql.connector.write.streaming.StreamingWrite;
24-
import org.apache.spark.sql.types.StructType;
2524

2625
/**
2726
* An interface for building the {@link BatchWrite}. Implementations can mix in some interfaces to
@@ -33,28 +32,6 @@
3332
@Evolving
3433
public interface WriteBuilder {
3534

36-
/**
37-
* Passes the `queryId` from Spark to data source. `queryId` is a unique string of the query. It's
38-
* possible that there are many queries running at the same time, or a query is restarted and
39-
* resumed. {@link BatchWrite} can use this id to identify the query.
40-
*
41-
* @return a new builder with the `queryId`. By default it returns `this`, which means the given
42-
* `queryId` is ignored. Please override this method to take the `queryId`.
43-
*/
44-
default WriteBuilder withQueryId(String queryId) {
45-
return this;
46-
}
47-
48-
/**
49-
* Passes the schema of the input data from Spark to data source.
50-
*
51-
* @return a new builder with the `schema`. By default it returns `this`, which means the given
52-
* `schema` is ignored. Please override this method to take the `schema`.
53-
*/
54-
default WriteBuilder withInputDataSchema(StructType schema) {
55-
return this;
56-
}
57-
5835
/**
5936
* Returns a {@link BatchWrite} to write data to batch source. By default this method throws
6037
* exception, data sources must overwrite this method to provide an implementation, if the
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.write
19+
20+
import org.apache.spark.sql.types.StructType
21+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
22+
23+
private[sql] case class LogicalWriteInfoImpl(
24+
queryId: String,
25+
schema: StructType,
26+
options: CaseInsensitiveStringMap) extends LogicalWriteInfo

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,8 @@ class InMemoryTable(
9595
override def createReaderFactory(): PartitionReaderFactory = BufferedRowsReaderFactory
9696
}
9797

98-
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
99-
InMemoryTable.maybeSimulateFailedTableWrite(options)
98+
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
99+
InMemoryTable.maybeSimulateFailedTableWrite(info.options)
100100

101101
new WriteBuilder with SupportsTruncate with SupportsOverwrite with SupportsDynamicOverwrite {
102102
private var writer: BatchWrite = Append

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/StagingInMemoryTableCatalog.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableExceptio
2323
import org.apache.spark.sql.connector.catalog._
2424
import org.apache.spark.sql.connector.expressions.Transform
2525
import org.apache.spark.sql.connector.read.ScanBuilder
26-
import org.apache.spark.sql.connector.write.WriteBuilder
26+
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
2727
import org.apache.spark.sql.types.StructType
2828
import org.apache.spark.sql.util.CaseInsensitiveStringMap
2929

@@ -88,8 +88,8 @@ class StagingInMemoryTableCatalog extends InMemoryTableCatalog with StagingTable
8888

8989
override def capabilities(): util.Set[TableCapability] = delegateTable.capabilities
9090

91-
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
92-
delegateTable.newWriteBuilder(options)
91+
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
92+
delegateTable.newWriteBuilder(info)
9393
}
9494

9595
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
2323

2424
import org.apache.spark.sql.catalyst.InternalRow
2525
import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability, TableProvider}
26-
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, PhysicalWriteInfo, SupportsTruncate, WriteBuilder, WriterCommitMessage}
26+
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, LogicalWriteInfo, PhysicalWriteInfo, SupportsTruncate, WriteBuilder, WriterCommitMessage}
2727
import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite}
2828
import org.apache.spark.sql.sources.DataSourceRegister
2929
import org.apache.spark.sql.types.StructType
@@ -39,7 +39,7 @@ class NoopDataSource extends TableProvider with DataSourceRegister {
3939
}
4040

4141
private[noop] object NoopTable extends Table with SupportsWrite {
42-
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = NoopWriteBuilder
42+
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = NoopWriteBuilder
4343
override def name(): String = "noop-table"
4444
override def schema(): StructType = new StructType()
4545
override def capabilities(): util.Set[TableCapability] = {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,34 +30,24 @@ import org.apache.spark.internal.io.FileCommitProtocol
3030
import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
3131
import org.apache.spark.sql.catalyst.InternalRow
3232
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
33-
import org.apache.spark.sql.connector.write.{BatchWrite, WriteBuilder}
33+
import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, WriteBuilder}
3434
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, DataSource, OutputWriterFactory, WriteJobDescription}
3535
import org.apache.spark.sql.execution.metric.SQLMetric
3636
import org.apache.spark.sql.internal.SQLConf
3737
import org.apache.spark.sql.types.{DataType, StructType}
38-
import org.apache.spark.sql.util.CaseInsensitiveStringMap
3938
import org.apache.spark.sql.util.SchemaUtils
4039
import org.apache.spark.util.SerializableConfiguration
4140

4241
abstract class FileWriteBuilder(
43-
options: CaseInsensitiveStringMap,
4442
paths: Seq[String],
4543
formatName: String,
46-
supportsDataType: DataType => Boolean) extends WriteBuilder {
47-
private var schema: StructType = _
48-
private var queryId: String = _
44+
supportsDataType: DataType => Boolean,
45+
info: LogicalWriteInfo) extends WriteBuilder {
46+
private val schema = info.schema()
47+
private val queryId = info.queryId()
48+
private val options = info.options()
4949
private var mode: SaveMode = _
5050

51-
override def withInputDataSchema(schema: StructType): WriteBuilder = {
52-
this.schema = schema
53-
this
54-
}
55-
56-
override def withQueryId(queryId: String): WriteBuilder = {
57-
this.queryId = queryId
58-
this
59-
}
60-
6151
def mode(mode: SaveMode): WriteBuilder = {
6252
this.mode = mode
6353
this

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.InternalRow
2626
import org.apache.spark.sql.catalyst.expressions.Attribute
2727
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2828
import org.apache.spark.sql.connector.catalog.SupportsWrite
29-
import org.apache.spark.sql.connector.write.{SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder}
29+
import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl, SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder}
3030
import org.apache.spark.sql.execution.SparkPlan
3131
import org.apache.spark.sql.sources.{AlwaysTrue, Filter, InsertableRelation}
3232
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -98,9 +98,12 @@ sealed trait V1FallbackWriters extends SupportsV1Write {
9898
}
9999

100100
protected def newWriteBuilder(): V1WriteBuilder = {
101-
val writeBuilder = table.newWriteBuilder(writeOptions)
102-
.withInputDataSchema(plan.schema)
103-
.withQueryId(UUID.randomUUID().toString)
101+
val info = LogicalWriteInfoImpl(
102+
queryId = UUID.randomUUID().toString,
103+
schema = plan.schema,
104+
options = writeOptions)
105+
val writeBuilder = table.newWriteBuilder(info)
106+
104107
writeBuilder.asV1Builder
105108
}
106109
}

0 commit comments

Comments
 (0)