-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-38237][SQL][SS] Introduce a new config to require all cluster keys on Aggregate #35552
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…hClusteredDistribution
cc. @cloud-fan @viirya @sunchao @xuanyuanking @c21 pinging reviewers of #35419. Thanks in advance! |
Thanks for the PR @HeartSaVioR . I don't quite understand the motivation though. Could you rephrase a bit in the "Why are the changes needed?" section of the PR description? It'd be best if it comes with an example. Thanks. |
I think this is understandable. It'd be better if you can provide an example in the description. But I'm bit confused that how it links to this renaming effort. Do you mean because |
Second to @sunchao and @viirya, would love a more elaborated explanation for motivation. btw I think we discussed this before that |
Since ClusteredDistribution is not strictly better than HashClusteredDistribution for all queries and vice versa, I think we should at least have a config for sending HashClusteredDistribution down as a distribution requirement, so that users can toggle the config for their queries when using ClusteredDistribution is worse than HashClusteredDistribution. |
My bad, I should have picked up the simpler case. The case I described is complicated one. I'll update the PR description with simpler one. There is a much simpler case: suppose table is hash partitioned by k1, with a small number of partitions and the data is skewed. If we run GROUP BY k1, k2 against the table, Spark doesn't add a shuffle (expected) and the query will run super slowly. We seem to consider this as a "trade off", but it is not going to be acceptable if the elapsed times of the query are from mins to hours. We are not yet very smart about choosing the best behavior automatically for specific query, so we would like to provide a new config to end users to tune the behavior manually. The new config was missing in this PR (my bad again) and I'll add a new config in the PR. We'd like to narrow the scope of impact to aggregation for now since the case is obvious for aggregation. We could consider another config or broader scope of impact if we encounter more cases. |
*/ | ||
case class StatefulOpClusteredDistribution( | ||
case class HashClusteredDistribution( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This and below lines basically restore the implementation of HashClusteredDistribution.
@@ -407,6 +407,16 @@ object SQLConf { | |||
.booleanConf | |||
.createWithDefault(true) | |||
|
|||
val REQUIRE_ALL_CLUSTER_KEYS_FOR_AGGREGATE = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I picked up the similar config name with similar description in above config (spark.sql.requireAllClusterKeysForCoPartition
) since the goal is very similar.
@@ -287,6 +287,9 @@ abstract class StreamExecution( | |||
// Disable cost-based join optimization as we do not want stateful operations | |||
// to be rearranged | |||
sparkSessionForStream.conf.set(SQLConf.CBO_ENABLED.key, "false") | |||
// Disable any config affecting the required child distribution of stateful operators. | |||
// Please read through the NOTE on the classdoc of HashClusteredDistribution for details. | |||
sparkSessionForStream.conf.set(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_AGGREGATE.key, "false") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is super important. The new config should never be set to true before we fix the fundamental problem with considering backward compatibility, since stateful operator would follow the changed output partitioning as well.
Thanks @HeartSaVioR for the update!
By
I double check the query plan you added in unit test: val grouped = df
// repartition by sub group keys which satisfies ClusteredDistribution(group keys)
.repartition($"key1")
.groupBy($"key1", $"key2")
.agg(sum($"value")) Enable the newly added config in this PR:
Disable the newly added config in this PR:
So the partial aggregate |
Would this PR's config will be useful, when combined with your internal work mentioned here? Say, key1 is |
@sigmod - maybe, but it still not covers all cases. Note the feature to skip partial aggregate is only for code-gen mode of hash aggregate. For other code paths (iterator mode of hash aggregate, object hash aggregate, sort aggregate), this is not covered. Though technically, nothing stops us to add support for all code paths. Based on our experience, the runtime skipping feature is useful but not panacea. We still have to pay some cost for partial aggregate to process some number of rows before realizing we can skip.
In addition, one can argue the manually tuned config might be not that useful. Let's just imagine the debugging workflow here. When users/developers realize a query has this data skew issue. Without this newly added config, we can also work around the issue by several options: (1).disable bucketing config if bucketed table is skewed, (2).remove unnecessary repartition() in query which caused skew, (3).add a repartition() on full group-by keys / |
|
@sigmod - I agree the join output can have data skew. If we talk about aggregate followed by join on subset of keys ( |
I suspect partial agg is not main problem in your internal prod setting as per #28804 (comment), but the parallelism of final agg is, correct? |
@sigmod - I would like to elaborate more about where I come from. I am not pushing back the feature with the goal to work with my internal company environment. In Spark community, we are trying to move forward with SPIP - storage partitioned join, and deprecate Btw just to make sure we are on the same page, if community would like add the feature to skip partial aggregate at runtime, we are happy to contribute back in master. |
@c21 the same problem also exists in Note that AQE doesn't have a chance to kick in because there's no shuffle between those operators. |
Thanks all for the valuable inputs. I really appreciated all the inputs! First of all, I have to admit I missed the partial aggregation. My bad. If I understand correctly, we seem to be on the same page that
(Please correct me if there is something we don't agree upon.) I agree this fix doesn't seem to go with right way, but seems like it is still valuable to discuss further for the better fix, regardless of the condition we address it in this PR or defer to other PR in the future. A. I'd like to verify that there are alternatives end users can always leverage them for any cases. I agree DataFrame has no problem on this given existence of B. If we go with skipping the partial aggregate altogether when end users give a hint (e.g. config, or better way if exists), would it work and would it be the acceptable solution for us? C. (out of scope on this PR, but to be future proof) |
My understanding is you can already provide hint in SQL (see SPARK-28746). If we do need to introduce this (which is still under discussion here), I'm wondering why we have to bring back |
Hi all, I am actually wanted to propose an alternative fix for this issue in #35574 . At the high level, it achieves the same effect to allow requiring exact full cluster keys, but it does not need to introduce Could you guys help take a look and leave some comment? Thanks! |
@sigmod - I concur we don't have an existing way to work around this issue, if you cannot change the query (as you said for auto-generated query in reporting or visualization software). Although if you can change the query, you can add a
@HeartSaVioR - beside the hint @sunchao mentioned in #35552 (comment), you can add a
I feel introducing more manually tuned config to allow user to disable partial aggregate, is not working at scale. I am actually in favor of query engine to adaptively make optimization under the hood, instead of leaving users to tune. I feel a better approach is to adaptively disable partial aggregate during runtime if reduction ratio is low - #28804 (comment) .
I think the data skew can happen, and we should allow users to work around. But I feel introducing |
I looked into #35574, and it looks good in overall. I'll close this. Thanks for making the proposal be better! |
The config is basically assuming the case the query engine is not able to handle it smart. If users find out the output partitioning before aggregation has skews and it has to aggregate, in most case it is pretty clear that partial aggregate does not help. I even doubt we have to be adaptive for this case, unless the condition of being adaptive can be determined without requiring actual execution. |
Thank you @HeartSaVioR for proposing the fix in the first place, and leading the discussion!
@HeartSaVioR - The config of skipping partial aggregate adaptively, is similar to Spark AQE, which can be enabled by default. For our company production environment, we actually enable the feature by default. So end user could potentially not notice the data skew at all, as Spark resolves it adaptively under the hood. |
…clustering keys ### What changes were proposed in this pull request? This PR is to allow`ClusteredDistribution` (such as operator with window, aggregate, etc) to require full clustering keys. Traditionally operator with `ClusteredDistribution` can be satisfied with `HashPartitioning` on subset of clustering keys. This behavior could potentially lead to data skewness (comments raised from #35552). Although we have various way to deal with the data skewness in this case, such as adding `repartition()`, disabling bucketing, adding custom AQE rule etc. There's still case we cannot handle e.g. data skewness in the same stage - (`join(t1.x = t2.x)` followed by `window(t1.x, t1.y)`). With the newly introduced config `spark.sql.requireAllClusterKeysForDistribution`. ### Why are the changes needed? Allow users to work around data skewness issue when partitioned on subset of keys. ### Does this PR introduce _any_ user-facing change? Yes, the added config, but disable by default. ### How was this patch tested? Added unit test in `DataFrameWindowFunctionsSuite.scala` and `DistributionSuite.scala` Closes #35574 from c21/exact-partition. Authored-by: Cheng Su <chengsu@fb.com> Signed-off-by: Chao Sun <sunchao@apple.com>
What changes were proposed in this pull request?
This PR proposes to introduce a new config
spark.sql.aggregate.requireAllClusterKeys
(default: false) to force required child distribution of Aggregate to useHashClusteredDistribution
instead ofClusteredDistribution
, which effectively requires all cluster keys on output partitioning.(Technically it requires more strict condition, but this PR takes the easier and less invasive way to implement since it's only effective when the config is enabled.)
This PR also proposes to rename back
StatefulOpClusteredDistribution
toHashClusteredDistribution
, since it is now being used from batch query as well. This PR retains the content of the classdoc for stateful operators inHashClusteredDistribution
, along with new general content of the classdoc.Why are the changes needed?
We figured out performance issues with aggregate operator in some cases. To explain a simple case, suppose table t1 is hash partitioned by k1, with a small number of partitions and the data is skewed. If we run GROUP BY k1, k2 against the table, Spark doesn't add a shuffle (intended in point of Spark's view) and the query will run super slowly.
We observed the query slowness from mins to hours due to this, so we can't simply say it's a trade off. Since we don't have a good way to automatically deal with the issue, we would like to provide the config to end users to tune by theirselves at least.
Does this PR introduce any user-facing change?
Yes, users will have a new config to enforce requiring all grouping keys on output partitioning of the child of aggregate.
How was this patch tested?
New test.