Skip to content

[SPARK-12010][SQL] Add columnMapping support #10312

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed

[SPARK-12010][SQL] Add columnMapping support #10312

wants to merge 3 commits into from

Conversation

CK50
Copy link
Contributor

@CK50 CK50 commented Dec 15, 2015

In the past Spark JDBC write only worked with technologies which support the following INSERT statement syntax (JdbcUtils.scala: insertStatement()):

INSERT INTO $table VALUES ( ?, ?, ..., ? )

But some technologies require a list of column names:

INSERT INTO $table ( $colNameList ) VALUES ( ?, ?, ..., ? )

This was blocking the use of e.g. the Progress JDBC Driver for Cassandra.

Another limitation is that syntax 1 relies no the dataframe field ordering match that of the target table. This works fine, as long as the target table has been created by writer.jdbc().

If the target table contains more columns (not created by writer.jdbc()), then the insert fails due mismatch of number of columns or their data types.

This PR adds an extra columnMapping parameter to write.jdbc(). This optional parameter allows the user to specify how dataframe field names are mapping to target table column names.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

"""
if properties is None:
properties = dict()
jprop = JavaClass("java.util.Properties", self._sqlContext._sc._gateway._gateway_client)()
for k in properties:
jprop.setProperty(k, properties[k])
self._jwrite.mode(mode).jdbc(url, table, jprop)
if columnMapping is None:
columnMapping = dict()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then indent looks too deep here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do updates. - Can you please remind me of best git commands for
updating the PR without creating extra commits?

On 15.12.2015 18:59, Sean Owen wrote:

In python/pyspark/sql/readwriter.py
#10312 (comment):

     """
     if properties is None:
         properties = dict()
     jprop = JavaClass("java.util.Properties", self._sqlContext._sc._gateway._gateway_client)()
     for k in properties:
         jprop.setProperty(k, properties[k])
  •    self._jwrite.mode(mode).jdbc(url, table, jprop)
    
  •    if columnMapping is None:
    
  •         columnMapping = dict()
    

Then indent looks too deep here


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/10312/files#r47672003.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra commits are good and fine as it helps show the evolution of the PR. They are squashed in the end. You can squash your own new several commits into one before pushing if it would more logically group and present your subsequent change.

columnMapping: scala.collection.Map[String, String] = null): String = {
if (columnMapping == null) {
rddSchema.fields.map(field => "?")
.mkString( s"INSERT INTO $table VALUES (", ", ", " ) ")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's still an extra space before the first arg in both mkString calls.
Also field => "?" could be _ => "?" for a touch more clarity. Finally I think the match statement actually reduces to columnMapping.getOrElse(field.name, field.name)?

@srowen
Copy link
Member

srowen commented Dec 17, 2015

@cloud-fan @marmbrus @rxin could we maybe get an opinion on the API change? it's the idea of adding a column mapping to allow for the dataframe's column names to map to different names in a table. I hesitate to commit this without a thumbs up from someone else.

@rxin
Copy link
Contributor

rxin commented Dec 17, 2015

Wouldn't a better solution is to just use DataFrame's own renaming of columns? I am not sure why we want this to be a jdbc specific features. If we do this, don't we need to do it for every single other data source?

@srowen
Copy link
Member

srowen commented Dec 17, 2015

Yeah fair point -- just project / rename a dataframe first. WDYT @CK50 ?
It doesn't change the underlying point about sometimes needing to explicitly name the columns in the INSERT.

@rxin
Copy link
Contributor

rxin commented Dec 17, 2015

Explicit naming columns in inserts is ok (good to have actually) - but I wouldn't add a jdbc specific column naming.

@marmbrus
Copy link
Contributor

+1 to explicit naming but without the extra jdbc specific ability to rename

@CK50
Copy link
Contributor Author

CK50 commented Dec 17, 2015

@srowen, @rxin, @marmbrus
If everyone is fine with always using column names in the generated INSERT statement and that these column names will always be taken from the DataFrame object, then I can go back to my earlier approach.
Can you please confirm? If so, I will change code accordingly.

@srowen
Copy link
Member

srowen commented Dec 17, 2015

@CK50 yes I personally think it's safer and more compatible to always put col names in the INSERT statement, and assume the caller has lined up the DataFrame column names. So yes it's your same change but just with the mapping logic backed out.

@CK50
Copy link
Contributor Author

CK50 commented Dec 18, 2015

Closing and going back to initial approach in new PR

@CK50 CK50 closed this Dec 18, 2015
dongjoon-hyun pushed a commit that referenced this pull request Nov 22, 2019
…ValueGroupedDataset

### What changes were proposed in this pull request?

This PR proposes to add `as` API to RelationalGroupedDataset. It creates KeyValueGroupedDataset instance using given grouping expressions, instead of a typed function in groupByKey API. Because it can leverage existing columns, it can use existing data partition, if any, when doing operations like cogroup.

### Why are the changes needed?

Currently if users want to do cogroup on DataFrames, there is no good way to do except for KeyValueGroupedDataset.

1. KeyValueGroupedDataset ignores existing data partition if any. That is a problem.
2. groupByKey calls typed function to create additional keys. You can not reuse existing columns, if you just need grouping by them.

```scala
// df1 and df2 are certainly partitioned and sorted.
val df1 = Seq((1, 2, 3), (2, 3, 4)).toDF("a", "b", "c")
  .repartition($"a").sortWithinPartitions("a")
val df2 = Seq((1, 2, 4), (2, 3, 5)).toDF("a", "b", "c")
  .repartition($"a").sortWithinPartitions("a")
```
```scala
// This groupBy.as.cogroup won't unnecessarily repartition the data
val df3 = df1.groupBy("a").as[Int]
  .cogroup(df2.groupBy("a").as[Int]) { case (key, data1, data2) =>
    data1.zip(data2).map { p =>
      p._1.getInt(2) + p._2.getInt(2)
    }
}
```

```
== Physical Plan ==
*(5) SerializeFromObject [input[0, int, false] AS value#11247]
+- CoGroup org.apache.spark.sql.DataFrameSuite$$Lambda$4922/12067092816eec1b6f, a#11209: int, createexternalrow(a#11209, b#11210, c#11211, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), createexternalrow(a#11225, b#11226, c#11227, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), [a#11209], [a#11225], [a#11209, b#11210, c#11211], [a#11225, b#11226, c#11227], obj#11246: int
   :- *(2) Sort [a#11209 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(a#11209, 5), false, [id=#10218]
   :     +- *(1) Project [_1#11202 AS a#11209, _2#11203 AS b#11210, _3#11204 AS c#11211]
   :        +- *(1) LocalTableScan [_1#11202, _2#11203, _3#11204]
   +- *(4) Sort [a#11225 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(a#11225, 5), false, [id=#10223]
         +- *(3) Project [_1#11218 AS a#11225, _2#11219 AS b#11226, _3#11220 AS c#11227]
            +- *(3) LocalTableScan [_1#11218, _2#11219, _3#11220]
```

```scala
// Current approach creates additional AppendColumns and repartition data again
val df4 = df1.groupByKey(r => r.getInt(0)).cogroup(df2.groupByKey(r => r.getInt(0))) {
  case (key, data1, data2) =>
    data1.zip(data2).map { p =>
      p._1.getInt(2) + p._2.getInt(2)
  }
}
```

```
== Physical Plan ==
*(7) SerializeFromObject [input[0, int, false] AS value#11257]
+- CoGroup org.apache.spark.sql.DataFrameSuite$$Lambda$4933/138102700737171997, value#11252: int, createexternalrow(a#11209, b#11210, c#11211, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), createexternalrow(a#11225, b#11226, c#11227, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), [value#11252], [value#11254], [a#11209, b#11210, c#11211], [a#11225, b#11226, c#11227], obj#11256: int
   :- *(3) Sort [value#11252 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(value#11252, 5), true, [id=#10302]
   :     +- AppendColumns org.apache.spark.sql.DataFrameSuite$$Lambda$4930/19529195347ce07f47, createexternalrow(a#11209, b#11210, c#11211, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), [input[0, int, false] AS value#11252]
   :        +- *(2) Sort [a#11209 ASC NULLS FIRST], false, 0
   :           +- Exchange hashpartitioning(a#11209, 5), false, [id=#10297]
   :              +- *(1) Project [_1#11202 AS a#11209, _2#11203 AS b#11210, _3#11204 AS c#11211]
   :                 +- *(1) LocalTableScan [_1#11202, _2#11203, _3#11204]
   +- *(6) Sort [value#11254 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(value#11254, 5), true, [id=#10312]
         +- AppendColumns org.apache.spark.sql.DataFrameSuite$$Lambda$4932/15265288491f0e0c1f, createexternalrow(a#11225, b#11226, c#11227, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), [input[0, int, false] AS value#11254]
            +- *(5) Sort [a#11225 ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(a#11225, 5), false, [id=#10307]
                  +- *(4) Project [_1#11218 AS a#11225, _2#11219 AS b#11226, _3#11220 AS c#11227]
                     +- *(4) LocalTableScan [_1#11218, _2#11219, _3#11220]
```

### Does this PR introduce any user-facing change?

Yes, this adds a new `as` API to RelationalGroupedDataset. Users can use it to create KeyValueGroupedDataset and do cogroup.

### How was this patch tested?

Unit tests.

Closes #26509 from viirya/SPARK-29427-2.

Lead-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Co-authored-by: Liang-Chi Hsieh <liangchi@uber.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants