-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-12329][SQL]Fix code in ClientWrapper which prints to stdout instead of stderr #10302
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
ok to test |
Why should they to go stderr? They are the results of a command you ran, not some side message. |
I did not run these SET commands. These "SET key value" messages show up in stdout, even before my script runs. My expectation was that only the results of my script should go to stdout. Is that incorrect? |
Test build #47700 has finished for PR 10302 at commit
|
Thanks for the pull request. I'm going through a list of pull requests to cut them down since the sheer number is breaking some of the tooling we have. Due to lack of activity on this pull request, I'm going to push a commit to close it. Feel free to reopen it or create a new one. |
…ValueGroupedDataset ### What changes were proposed in this pull request? This PR proposes to add `as` API to RelationalGroupedDataset. It creates KeyValueGroupedDataset instance using given grouping expressions, instead of a typed function in groupByKey API. Because it can leverage existing columns, it can use existing data partition, if any, when doing operations like cogroup. ### Why are the changes needed? Currently if users want to do cogroup on DataFrames, there is no good way to do except for KeyValueGroupedDataset. 1. KeyValueGroupedDataset ignores existing data partition if any. That is a problem. 2. groupByKey calls typed function to create additional keys. You can not reuse existing columns, if you just need grouping by them. ```scala // df1 and df2 are certainly partitioned and sorted. val df1 = Seq((1, 2, 3), (2, 3, 4)).toDF("a", "b", "c") .repartition($"a").sortWithinPartitions("a") val df2 = Seq((1, 2, 4), (2, 3, 5)).toDF("a", "b", "c") .repartition($"a").sortWithinPartitions("a") ``` ```scala // This groupBy.as.cogroup won't unnecessarily repartition the data val df3 = df1.groupBy("a").as[Int] .cogroup(df2.groupBy("a").as[Int]) { case (key, data1, data2) => data1.zip(data2).map { p => p._1.getInt(2) + p._2.getInt(2) } } ``` ``` == Physical Plan == *(5) SerializeFromObject [input[0, int, false] AS value#11247] +- CoGroup org.apache.spark.sql.DataFrameSuite$$Lambda$4922/12067092816eec1b6f, a#11209: int, createexternalrow(a#11209, b#11210, c#11211, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), createexternalrow(a#11225, b#11226, c#11227, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), [a#11209], [a#11225], [a#11209, b#11210, c#11211], [a#11225, b#11226, c#11227], obj#11246: int :- *(2) Sort [a#11209 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(a#11209, 5), false, [id=#10218] : +- *(1) Project [_1#11202 AS a#11209, _2#11203 AS b#11210, _3#11204 AS c#11211] : +- *(1) LocalTableScan [_1#11202, _2#11203, _3#11204] +- *(4) Sort [a#11225 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(a#11225, 5), false, [id=#10223] +- *(3) Project [_1#11218 AS a#11225, _2#11219 AS b#11226, _3#11220 AS c#11227] +- *(3) LocalTableScan [_1#11218, _2#11219, _3#11220] ``` ```scala // Current approach creates additional AppendColumns and repartition data again val df4 = df1.groupByKey(r => r.getInt(0)).cogroup(df2.groupByKey(r => r.getInt(0))) { case (key, data1, data2) => data1.zip(data2).map { p => p._1.getInt(2) + p._2.getInt(2) } } ``` ``` == Physical Plan == *(7) SerializeFromObject [input[0, int, false] AS value#11257] +- CoGroup org.apache.spark.sql.DataFrameSuite$$Lambda$4933/138102700737171997, value#11252: int, createexternalrow(a#11209, b#11210, c#11211, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), createexternalrow(a#11225, b#11226, c#11227, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), [value#11252], [value#11254], [a#11209, b#11210, c#11211], [a#11225, b#11226, c#11227], obj#11256: int :- *(3) Sort [value#11252 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(value#11252, 5), true, [id=#10302] : +- AppendColumns org.apache.spark.sql.DataFrameSuite$$Lambda$4930/19529195347ce07f47, createexternalrow(a#11209, b#11210, c#11211, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), [input[0, int, false] AS value#11252] : +- *(2) Sort [a#11209 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(a#11209, 5), false, [id=#10297] : +- *(1) Project [_1#11202 AS a#11209, _2#11203 AS b#11210, _3#11204 AS c#11211] : +- *(1) LocalTableScan [_1#11202, _2#11203, _3#11204] +- *(6) Sort [value#11254 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(value#11254, 5), true, [id=#10312] +- AppendColumns org.apache.spark.sql.DataFrameSuite$$Lambda$4932/15265288491f0e0c1f, createexternalrow(a#11225, b#11226, c#11227, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), [input[0, int, false] AS value#11254] +- *(5) Sort [a#11225 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(a#11225, 5), false, [id=#10307] +- *(4) Project [_1#11218 AS a#11225, _2#11219 AS b#11226, _3#11220 AS c#11227] +- *(4) LocalTableScan [_1#11218, _2#11219, _3#11220] ``` ### Does this PR introduce any user-facing change? Yes, this adds a new `as` API to RelationalGroupedDataset. Users can use it to create KeyValueGroupedDataset and do cogroup. ### How was this patch tested? Unit tests. Closes #26509 from viirya/SPARK-29427-2. Lead-authored-by: Liang-Chi Hsieh <viirya@gmail.com> Co-authored-by: Liang-Chi Hsieh <liangchi@uber.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
When I run "$spark-sql -f ", I see that few "SET key value" messages get printed on stdout instead of stderr. These messages should go to stderr.
@marmbrus