Skip to content

Commit

Permalink
[SPARK-33577][SS] Add support for V1Table in stream writer table API …
Browse files Browse the repository at this point in the history
…and create table if not exist by default

### What changes were proposed in this pull request?
After SPARK-32896, we have table API for stream writer but only support DataSource v2 tables. Here we add the following enhancements:

- Create non-existing tables by default
- Support both managed and external V1Tables

### Why are the changes needed?
Make the API covers more use cases. Especially for the file provider based tables.

### Does this PR introduce _any_ user-facing change?
Yes, new features added.

### How was this patch tested?
Add new UTs.

Closes #30521 from xuanyuanking/SPARK-33577.

Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
  • Loading branch information
xuanyuanking authored and HeartSaVioR committed Dec 4, 2020
1 parent 94c144b commit 325abf7
Show file tree
Hide file tree
Showing 2 changed files with 188 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@ import java.util.concurrent.TimeoutException

import scala.collection.JavaConverters._

import org.apache.hadoop.fs.Path

import org.apache.spark.annotation.Evolving
import org.apache.spark.api.java.function.VoidFunction2
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.plans.logical.CreateTableStatement
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableProvider}
import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableProvider, V1Table, V2TableWithV1Fallback}
import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.DataSource
Expand Down Expand Up @@ -298,52 +302,85 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {

/**
* Starts the execution of the streaming query, which will continually output results to the given
* table as new data arrives. The returned [[StreamingQuery]] object can be used to interact with
* the stream.
* table as new data arrives. A new table will be created if the table not exists. The returned
* [[StreamingQuery]] object can be used to interact with the stream.
*
* @since 3.1.0
*/
@throws[TimeoutException]
def toTable(tableName: String): StreamingQuery = {
this.source = SOURCE_NAME_TABLE
this.tableName = tableName
startInternal(None)
}

private def startInternal(path: Option[String]): StreamingQuery = {
if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
throw new AnalysisException("Hive data source can only be used with tables, you can not " +
"write files of Hive data source directly.")
}
import df.sparkSession.sessionState.analyzer.CatalogAndIdentifier

if (source == SOURCE_NAME_TABLE) {
assertNotPartitioned(SOURCE_NAME_TABLE)
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
val originalMultipartIdentifier = df.sparkSession.sessionState.sqlParser
.parseMultipartIdentifier(tableName)
val CatalogAndIdentifier(catalog, identifier) = originalMultipartIdentifier

import df.sparkSession.sessionState.analyzer.CatalogAndIdentifier
// Currently we don't create a logical streaming writer node in logical plan, so cannot rely
// on analyzer to resolve it. Directly lookup only for temp view to provide clearer message.
// TODO (SPARK-27484): we should add the writing node before the plan is analyzed.
if (df.sparkSession.sessionState.catalog.isTempView(originalMultipartIdentifier)) {
throw new AnalysisException(s"Temporary view $tableName doesn't support streaming write")
}

if (!catalog.asTableCatalog.tableExists(identifier)) {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
val originalMultipartIdentifier = df.sparkSession.sessionState.sqlParser
.parseMultipartIdentifier(tableName)
val CatalogAndIdentifier(catalog, identifier) = originalMultipartIdentifier

// Currently we don't create a logical streaming writer node in logical plan, so cannot rely
// on analyzer to resolve it. Directly lookup only for temp view to provide clearer message.
// TODO (SPARK-27484): we should add the writing node before the plan is analyzed.
if (df.sparkSession.sessionState.catalog.isTempView(originalMultipartIdentifier)) {
throw new AnalysisException(s"Temporary view $tableName doesn't support streaming write")
}
/**
* Note, currently the new table creation by this API doesn't fully cover the V2 table.
* TODO (SPARK-33638): Full support of v2 table creation
*/
val cmd = CreateTableStatement(
originalMultipartIdentifier,
df.schema.asNullable,
partitioningColumns.getOrElse(Nil).asTransforms.toSeq,
None,
Map.empty[String, String],
Some(source),
Map.empty[String, String],
extraOptions.get("path"),
None,
None,
external = false,
ifNotExists = false)
Dataset.ofRows(df.sparkSession, cmd)
}

val tableInstance = catalog.asTableCatalog.loadTable(identifier)
val tableInstance = catalog.asTableCatalog.loadTable(identifier)

import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
val sink = tableInstance match {
case t: SupportsWrite if t.supports(STREAMING_WRITE) => t
case t => throw new AnalysisException(s"Table $tableName doesn't support streaming " +
s"write - $t")
def writeToV1Table(table: CatalogTable): StreamingQuery = {
if (table.tableType == CatalogTableType.VIEW) {
throw new AnalysisException(s"Streaming into views $tableName is not supported.")
}
require(table.provider.isDefined)
if (source != table.provider.get) {
throw new AnalysisException(s"The input source($source) is different from the table " +
s"$tableName's data source provider(${table.provider.get}).")
}
format(table.provider.get)
.option("path", new Path(table.location).toString).start()
}

startQuery(sink, extraOptions)
} else if (source == SOURCE_NAME_MEMORY) {
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
tableInstance match {
case t: SupportsWrite if t.supports(STREAMING_WRITE) => startQuery(t, extraOptions)
case t: V2TableWithV1Fallback =>
writeToV1Table(t.v1Table)
case t: V1Table =>
writeToV1Table(t.v1Table)
case t => throw new AnalysisException(s"Table $tableName doesn't support streaming " +
s"write - $t")
}
}

private def startInternal(path: Option[String]): StreamingQuery = {
if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
throw new AnalysisException("Hive data source can only be used with tables, you can not " +
"write files of Hive data source directly.")
}

if (source == SOURCE_NAME_MEMORY) {
assertNotPartitioned(SOURCE_NAME_MEMORY)
if (extraOptions.get("queryName").isEmpty) {
throw new AnalysisException("queryName must be specified for memory sink")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.scalatest.BeforeAndAfter

import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.connector.{FakeV2Provider, InMemoryTableCatalog, InMemoryTableSessionCatalog}
Expand All @@ -39,6 +39,7 @@ import org.apache.spark.sql.streaming.StreamTest
import org.apache.spark.sql.streaming.sources.FakeScanBuilder
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils

class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter {
import testImplicits._
Expand Down Expand Up @@ -175,21 +176,24 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter {
test("write: write to table with custom catalog & no namespace") {
val tableIdentifier = "testcat.table_name"

spark.sql(s"CREATE TABLE $tableIdentifier (id bigint, data string) USING foo")
checkAnswer(spark.table(tableIdentifier), Seq.empty)
withTable(tableIdentifier) {
spark.sql(s"CREATE TABLE $tableIdentifier (id bigint, data string) USING foo")
checkAnswer(spark.table(tableIdentifier), Seq.empty)

runTestWithStreamAppend(tableIdentifier)
runTestWithStreamAppend(tableIdentifier)
}
}

test("write: write to table with custom catalog & namespace") {
spark.sql("CREATE NAMESPACE testcat.ns")

val tableIdentifier = "testcat.ns.table_name"

spark.sql(s"CREATE TABLE $tableIdentifier (id bigint, data string) USING foo")
checkAnswer(spark.table(tableIdentifier), Seq.empty)
withTable(tableIdentifier) {
spark.sql(s"CREATE TABLE $tableIdentifier (id bigint, data string) USING foo")
checkAnswer(spark.table(tableIdentifier), Seq.empty)

runTestWithStreamAppend(tableIdentifier)
runTestWithStreamAppend(tableIdentifier)
}
}

test("write: write to table with default session catalog") {
Expand All @@ -200,35 +204,19 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter {
spark.sql("CREATE NAMESPACE ns")

val tableIdentifier = "ns.table_name"
spark.sql(s"CREATE TABLE $tableIdentifier (id bigint, data string) USING $v2Source")
checkAnswer(spark.table(tableIdentifier), Seq.empty)
withTable(tableIdentifier) {
spark.sql(s"CREATE TABLE $tableIdentifier (id bigint, data string) USING $v2Source")
checkAnswer(spark.table(tableIdentifier), Seq.empty)

runTestWithStreamAppend(tableIdentifier)
runTestWithStreamAppend(tableIdentifier)
}
}

test("write: write to non-exist table with custom catalog") {
val tableIdentifier = "testcat.nonexisttable"
spark.sql("CREATE NAMESPACE testcat.ns")

withTempDir { checkpointDir =>
val exc = intercept[NoSuchTableException] {
runStreamQueryAppendMode(tableIdentifier, checkpointDir, Seq.empty, Seq.empty)
}
assert(exc.getMessage.contains("nonexisttable"))
}
}

test("write: write to file provider based table isn't allowed yet") {
val tableIdentifier = "table_name"

spark.sql(s"CREATE TABLE $tableIdentifier (id bigint, data string) USING parquet")
checkAnswer(spark.table(tableIdentifier), Seq.empty)

withTempDir { checkpointDir =>
val exc = intercept[AnalysisException] {
runStreamQueryAppendMode(tableIdentifier, checkpointDir, Seq.empty, Seq.empty)
}
assert(exc.getMessage.contains("doesn't support streaming write"))
withTable(tableIdentifier) {
runTestWithStreamAppend(tableIdentifier)
}
}

Expand Down Expand Up @@ -262,8 +250,107 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter {
val exc = intercept[AnalysisException] {
runStreamQueryAppendMode(viewIdentifier, checkpointDir, Seq.empty, Seq.empty)
}
assert(exc.getMessage.contains("doesn't support streaming write"))
assert(exc.getMessage.contains(s"Streaming into views $viewIdentifier is not supported"))
}
}

test("write: write to an external table") {
withTempDir { dir =>
val tableName = "stream_test"
withTable(tableName) {
checkForStreamTable(Some(dir), tableName)
}
}
}

test("write: write to a managed table") {
val tableName = "stream_test"
withTable(tableName) {
checkForStreamTable(None, tableName)
}
}

test("write: write to an external table with existing path") {
withTempDir { dir =>
val tableName = "stream_test"
withTable(tableName) {
// The file written by batch will not be seen after the table was written by a streaming
// query. This is because we loads files from the metadata log instead of listing them
// using HDFS API.
Seq(4, 5, 6).toDF("value").write.format("parquet")
.option("path", dir.getCanonicalPath).saveAsTable(tableName)

checkForStreamTable(Some(dir), tableName)
}
}
}

test("write: write to a managed table with existing path") {
val tableName = "stream_test"
withTable(tableName) {
// The file written by batch will not be seen after the table was written by a streaming
// query. This is because we loads files from the metadata log instead of listing them
// using HDFS API.
Seq(4, 5, 6).toDF("value").write.format("parquet").saveAsTable(tableName)

checkForStreamTable(None, tableName)
}
}

test("write: write to an external path and create table") {
withTempDir { dir =>
val tableName = "stream_test"
withTable(tableName) {
// The file written by batch will not be seen after the table was written by a streaming
// query. This is because we loads files from the metadata log instead of listing them
// using HDFS API.
Seq(4, 5, 6).toDF("value").write
.mode("append").format("parquet").save(dir.getCanonicalPath)

checkForStreamTable(Some(dir), tableName)
}
}
}

test("write: write to table with different format shouldn't be allowed") {
val tableName = "stream_test"

spark.sql(s"CREATE TABLE $tableName (id bigint, data string) USING json")
checkAnswer(spark.table(tableName), Seq.empty)

withTempDir { checkpointDir =>
val exc = intercept[AnalysisException] {
runStreamQueryAppendMode(tableName, checkpointDir, Seq.empty, Seq.empty)
}
assert(exc.getMessage.contains("The input source(parquet) is different from the table " +
s"$tableName's data source provider(json)"))
}
}

private def checkForStreamTable(dir: Option[File], tableName: String): Unit = {
val memory = MemoryStream[Int]
val dsw = memory.toDS().writeStream.format("parquet")
dir.foreach { output =>
dsw.option("path", output.getCanonicalPath)
}
val sq = dsw
.option("checkpointLocation", Utils.createTempDir().getCanonicalPath)
.toTable(tableName)
memory.addData(1, 2, 3)
sq.processAllAvailable()

checkDataset(
spark.table(tableName).as[Int],
1, 2, 3)
val catalogTable = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
val path = if (dir.nonEmpty) {
dir.get
} else {
new File(catalogTable.location)
}
checkDataset(
spark.read.format("parquet").load(path.getCanonicalPath).as[Int],
1, 2, 3)
}

private def runTestWithStreamAppend(tableIdentifier: String) = {
Expand Down

0 comments on commit 325abf7

Please sign in to comment.