Skip to content

Commit 6994bad

Browse files
Hisoka-XMaxGekk
authored andcommitted
[SPARK-44262][SQL] Add dropTable and getInsertStatement to JdbcDialect
### What changes were proposed in this pull request? 1. This PR add `dropTable` function to `JdbcDialect`. So user can override dropTable SQL by other JdbcDialect like Neo4J Neo4J Drop case ```sql MATCH (m:Person {name: 'Mark'}) DELETE m ``` 2. Also add `getInsertStatement` for same reason. Neo4J Insert case ```sql MATCH (p:Person {name: 'Jennifer'}) SET p.birthdate = date('1980-01-01') RETURN p ``` Neo4J SQL(in fact named `CQL`) not like normal SQL, but it have JDBC driver. ### Why are the changes needed? Make JdbcDialect more useful ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? exist test Closes apache#41855 from Hisoka-X/SPARK-44262_JDBCUtils_improve. Authored-by: Jia Fan <fanjiaeminem@qq.com> Signed-off-by: Max Gekk <max.gekk@gmail.com>
1 parent 9bdad31 commit 6994bad

File tree

2 files changed

+35
-8
lines changed

2 files changed

+35
-8
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ object JdbcUtils extends Logging with SQLConfHelper {
7878
* Drops a table from the JDBC database.
7979
*/
8080
def dropTable(conn: Connection, table: String, options: JDBCOptions): Unit = {
81-
executeStatement(conn, options, s"DROP TABLE $table")
81+
val dialect = JdbcDialects.get(options.url)
82+
executeStatement(conn, options, dialect.dropTable(table))
8283
}
8384

8485
/**
@@ -114,22 +115,19 @@ object JdbcUtils extends Logging with SQLConfHelper {
114115
isCaseSensitive: Boolean,
115116
dialect: JdbcDialect): String = {
116117
val columns = if (tableSchema.isEmpty) {
117-
rddSchema.fields.map(x => dialect.quoteIdentifier(x.name)).mkString(",")
118+
rddSchema.fields
118119
} else {
119120
// The generated insert statement needs to follow rddSchema's column sequence and
120121
// tableSchema's column names. When appending data into some case-sensitive DBMSs like
121122
// PostgreSQL/Oracle, we need to respect the existing case-sensitive column names instead of
122123
// RDD column names for user convenience.
123-
val tableColumnNames = tableSchema.get.fieldNames
124124
rddSchema.fields.map { col =>
125-
val normalizedName = tableColumnNames.find(f => conf.resolver(f, col.name)).getOrElse {
125+
tableSchema.get.find(f => conf.resolver(f.name, col.name)).getOrElse {
126126
throw QueryCompilationErrors.columnNotFoundInSchemaError(col, tableSchema)
127127
}
128-
dialect.quoteIdentifier(normalizedName)
129-
}.mkString(",")
128+
}
130129
}
131-
val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
132-
s"INSERT INTO $table ($columns) VALUES ($placeholders)"
130+
dialect.insertIntoTable(table, columns)
133131
}
134132

135133
/**

sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,24 @@ abstract class JdbcDialect extends Serializable with Logging {
193193
statement.executeUpdate(s"CREATE TABLE $tableName ($strSchema) $createTableOptions")
194194
}
195195

196+
/**
197+
* Returns an Insert SQL statement template for inserting a row into the target table via JDBC
198+
* conn. Use "?" as placeholder for each value to be inserted.
199+
* E.g. `INSERT INTO t ("name", "age", "gender") VALUES (?, ?, ?)`
200+
*
201+
* @param table The name of the table.
202+
* @param fields The fields of the row that will be inserted.
203+
* @return The SQL query to use for insert data into table.
204+
*/
205+
@Since("4.0.0")
206+
def insertIntoTable(
207+
table: String,
208+
fields: Array[StructField]): String = {
209+
val placeholders = fields.map(_ => "?").mkString(",")
210+
val columns = fields.map(x => quoteIdentifier(x.name)).mkString(",")
211+
s"INSERT INTO $table ($columns) VALUES ($placeholders)"
212+
}
213+
196214
/**
197215
* Get the SQL query that should be used to find if the given table exists. Dialects can
198216
* override this method to return a query that works best in a particular database.
@@ -542,6 +560,17 @@ abstract class JdbcDialect extends Serializable with Logging {
542560
}
543561
}
544562

563+
/**
564+
* Build a SQL statement to drop the given table.
565+
*
566+
* @param table the table name
567+
* @return The SQL statement to use for drop the table.
568+
*/
569+
@Since("4.0.0")
570+
def dropTable(table: String): String = {
571+
s"DROP TABLE $table"
572+
}
573+
545574
/**
546575
* Build a create index SQL statement.
547576
*

0 commit comments

Comments
 (0)