Skip to content

[SPARK-38237][SQL][SS] Introduce a new config to require all cluster keys on Aggregate #35552

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from

Conversation

HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Feb 17, 2022

What changes were proposed in this pull request?

This PR proposes to introduce a new config spark.sql.aggregate.requireAllClusterKeys (default: false) to force required child distribution of Aggregate to use HashClusteredDistribution instead of ClusteredDistribution, which effectively requires all cluster keys on output partitioning.
(Technically it requires more strict condition, but this PR takes the easier and less invasive way to implement since it's only effective when the config is enabled.)

This PR also proposes to rename back StatefulOpClusteredDistribution to HashClusteredDistribution, since it is now being used from batch query as well. This PR retains the content of the classdoc for stateful operators in HashClusteredDistribution, along with new general content of the classdoc.

Why are the changes needed?

We figured out performance issues with aggregate operator in some cases. To explain a simple case, suppose table t1 is hash partitioned by k1, with a small number of partitions and the data is skewed. If we run GROUP BY k1, k2 against the table, Spark doesn't add a shuffle (intended in point of Spark's view) and the query will run super slowly.

We observed the query slowness from mins to hours due to this, so we can't simply say it's a trade off. Since we don't have a good way to automatically deal with the issue, we would like to provide the config to end users to tune by theirselves at least.

Does this PR introduce any user-facing change?

Yes, users will have a new config to enforce requiring all grouping keys on output partitioning of the child of aggregate.

How was this patch tested?

New test.

@HeartSaVioR
Copy link
Contributor Author

cc. @cloud-fan @viirya @sunchao @xuanyuanking @c21 pinging reviewers of #35419. Thanks in advance!

@sunchao
Copy link
Member

sunchao commented Feb 17, 2022

Thanks for the PR @HeartSaVioR . I don't quite understand the motivation though. Could you rephrase a bit in the "Why are the changes needed?" section of the PR description? It'd be best if it comes with an example. Thanks.

@viirya
Copy link
Member

viirya commented Feb 17, 2022

We figured out that HashClusteredDistribution is still desirable in some cases even without stateful operators; HashPartitioning with subset of grouping keys can satisfy ClusteredDistribution, which means the cardinality of the subset of grouping keys technically defines the max parallelism. Increasing the number of partitions does not always help to solve the skew of the partitions.

I think this is understandable. It'd be better if you can provide an example in the description. But I'm bit confused that how it links to this renaming effort. Do you mean because StatefulOpClusteredDistribution is not only for stateful operation, so you propose to rename it back? As it was removed and renamed before, do we have any place that needs to use HashClusteredDistribution now?

@c21
Copy link
Contributor

c21 commented Feb 17, 2022

Second to @sunchao and @viirya, would love a more elaborated explanation for motivation. btw I think we discussed this before that HashPartitioning with subset of grouping keys satisfying ClusteredDistribution, can be a tradeoff. On one hand, it can be bad because it may cause data skew if the cardinality of subset of keys are not big enough. On the other hand, it can be good because it can help avoid the shuffle before join/group-by/etc. Even if we want to change an existing non-stateful operator, or for a new operator to use HashClusteredDistribution than ClusteredDistribution, we still need to discuss the tradeoff in depth.

@sigmod
Copy link
Contributor

sigmod commented Feb 17, 2022

As it was removed and renamed before, do we have any place that needs to use HashClusteredDistribution now?
HashPartitioning with subset of grouping keys satisfying ClusteredDistribution, can be a tradeoff.

Since ClusteredDistribution is not strictly better than HashClusteredDistribution for all queries and vice versa, I think we should at least have a config for sending HashClusteredDistribution down as a distribution requirement, so that users can toggle the config for their queries when using ClusteredDistribution is worse than HashClusteredDistribution.

@HeartSaVioR
Copy link
Contributor Author

My bad, I should have picked up the simpler case. The case I described is complicated one. I'll update the PR description with simpler one.

There is a much simpler case: suppose table is hash partitioned by k1, with a small number of partitions and the data is skewed. If we run GROUP BY k1, k2 against the table, Spark doesn't add a shuffle (expected) and the query will run super slowly.

We seem to consider this as a "trade off", but it is not going to be acceptable if the elapsed times of the query are from mins to hours. We are not yet very smart about choosing the best behavior automatically for specific query, so we would like to provide a new config to end users to tune the behavior manually. The new config was missing in this PR (my bad again) and I'll add a new config in the PR.

We'd like to narrow the scope of impact to aggregation for now since the case is obvious for aggregation. We could consider another config or broader scope of impact if we encounter more cases.

@HeartSaVioR HeartSaVioR changed the title [SPARK-38237][SQL][SS] Rename back StatefulOpClusteredDistribution to HashClusteredDistribution [SPARK-38237][SQL][SS] Introduce a new config to require all cluster keys on Aggregate Feb 18, 2022
*/
case class StatefulOpClusteredDistribution(
case class HashClusteredDistribution(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and below lines basically restore the implementation of HashClusteredDistribution.

@@ -407,6 +407,16 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val REQUIRE_ALL_CLUSTER_KEYS_FOR_AGGREGATE =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I picked up the similar config name with similar description in above config (spark.sql.requireAllClusterKeysForCoPartition) since the goal is very similar.

@@ -287,6 +287,9 @@ abstract class StreamExecution(
// Disable cost-based join optimization as we do not want stateful operations
// to be rearranged
sparkSessionForStream.conf.set(SQLConf.CBO_ENABLED.key, "false")
// Disable any config affecting the required child distribution of stateful operators.
// Please read through the NOTE on the classdoc of HashClusteredDistribution for details.
sparkSessionForStream.conf.set(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_AGGREGATE.key, "false")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is super important. The new config should never be set to true before we fix the fundamental problem with considering backward compatibility, since stateful operator would follow the changed output partitioning as well.

@c21
Copy link
Contributor

c21 commented Feb 18, 2022

Thanks @HeartSaVioR for the update!

There is a much simpler case: suppose table is hash partitioned by k1, with a small number of partitions and the data is skewed. If we run GROUP BY k1, k2 against the table, Spark doesn't add a shuffle (expected) and the query will run super slowly.

By table is hash partitioned by k1, are you referring to table bucketed on k1, or some other scenario? I can understand the query can get arbitrarily complicated depending on the business logic from individual's environment, but would like to dig more into it. I think we have couple of options:

  • Data skew in bucketed table, not only will cause slowness in aggregate, but also for all other operators, such as shuffle join. I think we'd better fix the issue from the root. i.e. table scan operator. Table scan operator can determine whether to set its outputPartitioning as HashPartitioning(bucketed_keys) or not, based on some statistics. One option is to check the bucket file size, and optionally row count per file if available during query planning time. If file size/row count has skewness (certain bucket file is too large/having too many rows), do not populate outputPartitioning, i.e. not passing bucketing output partitioning down through the query plan. This will not need the requiredChildDistribution change for aggregate operator, and fix issues for other operators like join as well.
  • Data skew in the middle of plan. Then we can use some AQE strategy to handle it. For example, if the shuffle before aggregate on subset of keys have data skewness (i.e. shuffle(x) before aggregate(x, y)). We can add an AQE rule to dynamically add a shuffle(x, y) after shuffle(x), but before aggregate(x, y), based on runtime shuffle size of shuffle(x). This also solved the issue, and has benefit to auto-tune the query adaptively, without manually setting the config per query/job (manually tune configs per query is not scalable, right?).

We seem to consider this as a "trade off", but it is not going to be acceptable if the elapsed times of the query are from mins to hours. We are not yet very smart about choosing the best behavior automatically for specific query, so we would like to provide a new config to end users to tune the behavior manually.

I double check the query plan you added in unit test:

val grouped = df
        // repartition by sub group keys which satisfies ClusteredDistribution(group keys)
        .repartition($"key1")
        .groupBy($"key1", $"key2")
        .agg(sum($"value"))

Enable the newly added config in this PR:

*(3) HashAggregate(keys=[key1#10, key2#11], functions=[sum(value#12)], output=[key1#10, key2#11, sum(value)#20L])
+- Exchange hashpartitioning(key1#10, key2#11, 5), ENSURE_REQUIREMENTS, [id=#25]
   +- *(2) HashAggregate(keys=[key1#10, key2#11], functions=[partial_sum(value#12)], output=[key1#10, key2#11, sum#25L])
      +- Exchange hashpartitioning(key1#10, 5), REPARTITION_BY_COL, [id=#21]
         +- *(1) Project [_1#3 AS key1#10, _2#4 AS key2#11, _3#5 AS value#12]
            +- *(1) LocalTableScan [_1#3, _2#4, _3#5]

Disable the newly added config in this PR:

*(2) HashAggregate(keys=[key1#10, key2#11], functions=[sum(value#12)], output=[key1#10, key2#11, sum(value)#20L])
+- *(2) HashAggregate(keys=[key1#10, key2#11], functions=[partial_sum(value#12)], output=[key1#10, key2#11, sum#25L])
   +- Exchange hashpartitioning(key1#10, 5), REPARTITION_BY_COL, [id=#19]
      +- *(1) Project [_1#3 AS key1#10, _2#4 AS key2#11, _3#5 AS value#12]
         +- *(1) LocalTableScan [_1#3, _2#4, _3#5]

So the partial aggregate HashAggregate(keys=[key1#10, key2#11], functions=[partial_sum(value#12)] ... is always there with enabling/disabling the config. I doubt if this feature can help the performance of data skew, as you can see we have to do the partial aggregate anyway on the skewed partition. The partial aggregate will be the major cost here. Wondering could you share us with a concrete aggregate query example with data skew?

@sigmod
Copy link
Contributor

sigmod commented Feb 18, 2022

The partial aggregate will be the major cost here.

Would this PR's config will be useful, when combined with your internal work mentioned here?
#28804 (comment)

Say, key1 is gender and key2 is customerId, parallelism 2 is too low even if you can smartly skip partial agg.

@c21
Copy link
Contributor

c21 commented Feb 18, 2022

Would this PR's config will be useful, when combined with your internal work mentioned here?
#28804 (comment)

@sigmod - maybe, but it still not covers all cases. Note the feature to skip partial aggregate is only for code-gen mode of hash aggregate. For other code paths (iterator mode of hash aggregate, object hash aggregate, sort aggregate), this is not covered. Though technically, nothing stops us to add support for all code paths. Based on our experience, the runtime skipping feature is useful but not panacea. We still have to pay some cost for partial aggregate to process some number of rows before realizing we can skip.

Say, key1 is gender and key2 is customerId, parallelism 2 is too low even if you can smartly skip partial agg.

In addition, one can argue the manually tuned config might be not that useful. Let's just imagine the debugging workflow here. When users/developers realize a query has this data skew issue. Without this newly added config, we can also work around the issue by several options: (1).disable bucketing config if bucketed table is skewed, (2).remove unnecessary repartition() in query which caused skew, (3).add a repartition() on full group-by keys / DISTRIBUTE BY (...) in SQL query before aggregate. So I feel this seems to render the config useless.

@sigmod
Copy link
Contributor

sigmod commented Feb 18, 2022

Without this newly added config, we can also work around the issue by several options

  • for (1) and (2), undesirable situation can happen beyond the two. E.g., the skew raised in a join output for a many-to-many join;
  • for (3), sometimes modifying queries might not be feasible, e.g., queries generated by a reporting or visualization software stack.

@c21
Copy link
Contributor

c21 commented Feb 18, 2022

for (1) and (2), undesirable situation can happen beyond the two. E.g., the skew raised in a join output for a many-to-many join;

@sigmod - I agree the join output can have data skew. If we talk about aggregate followed by join on subset of keys (join(t1.x = t2.x) followed by aggregate(t1.x, t1.y)) , the partial aggregate would be the major cost again same as the example in #35552 (comment) . I am worried if the feature introduced here actually fix the problem or not.

@sigmod
Copy link
Contributor

sigmod commented Feb 18, 2022

the partial aggregate would be the major cost again same as the example in
#35552 (comment)

I suspect partial agg is not main problem in your internal prod setting as per #28804 (comment), but the parallelism of final agg is, correct?

@c21
Copy link
Contributor

c21 commented Feb 18, 2022

I suspect partial agg is not main problem in your internal prod setting as per #28804 (comment), but the parallelism of final agg is, correct?

@sigmod - I would like to elaborate more about where I come from. I am not pushing back the feature with the goal to work with my internal company environment. In Spark community, we are trying to move forward with SPIP - storage partitioned join, and deprecate HashClusteredDistribution is one part of work. So it's not preferred to add the whole thing back if there's alternative option. For stream-stream join operator, we have to add the distribution StatefulOpClusteredDistribution back for state store correctness only, and I don't think it's in the right direction to make it more general. I thought we only wanted to make a special case for SS operators, but look like here we want to keep expanding the functionality.

Btw just to make sure we are on the same page, if community would like add the feature to skip partial aggregate at runtime, we are happy to contribute back in master.

@sigmod
Copy link
Contributor

sigmod commented Feb 18, 2022

StatefulOpClusteredDistribution back for state store correctness only,
and I don't think it's in the right direction to make it more general

@c21 the same problem also exists in
join(t1.x = t2.x) followed by window(t1.x, t1.y) or join(t1.x = t3.x and t1.y = t3.y)

Note that AQE doesn't have a chance to kick in because there's no shuffle between those operators.
Thus, I suspect configs with HashClusteredDistribution can at least rescue such queries from timeout/disk-space-full etc. It's not correctness issue, but can also be severe if a user cannot see the result of a query running forever and does not have a config workaround.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Feb 18, 2022

Thanks all for the valuable inputs. I really appreciated all the inputs!

First of all, I have to admit I missed the partial aggregation. My bad.

If I understand correctly, we seem to be on the same page that

  1. There are various cases skew happens before aggregation (e.g. join), and ClusteredDistribution wouldn't deal with skew since it is already clustered as it is required.
  2. In above cases, there are some cases Spark cannot deal automatically. (e.g. insufficient stats)
  3. We do partial aggregate hence simply changing the required child distribution to HashClusteredDistribution wouldn't help.

(Please correct me if there is something we don't agree upon.)

I agree this fix doesn't seem to go with right way, but seems like it is still valuable to discuss further for the better fix, regardless of the condition we address it in this PR or defer to other PR in the future.

A.
(First of all, sorry for the ignorance. I haven't dealt with pure SQL statement, so my question could be very silly.)

I'd like to verify that there are alternatives end users can always leverage them for any cases. I agree DataFrame has no problem on this given existence of repartition(). But how about SQL statement? Do we provide the same for SQL statement? Could end users inject the hint anywhere before operator and it will take effect on the exact place?

B.
3) is valid and a great point I totally missed. But then there would be another question, "why we do partial aggregate even users express the intention they want to do full shuffle because they indicate a skew?" I suspect we have to skip it.

If we go with skipping the partial aggregate altogether when end users give a hint (e.g. config, or better way if exists), would it work and would it be the acceptable solution for us?

C. (out of scope on this PR, but to be future proof)
Let's expand this to the broader operators leveraging ClusteredDistribution. 3) doesn't even exist for them and we have a problem there as well. What would be the way to fix it if users indicate the issue and want to deal with?

@sunchao
Copy link
Member

sunchao commented Feb 19, 2022

I agree DataFrame has no problem on this given existence of repartition(). But how about SQL statement? Do we provide the same for SQL statement? Could end users inject the hint anywhere before operator and it will take effect on the exact place?

My understanding is you can already provide hint in SQL (see SPARK-28746).

If we do need to introduce this (which is still under discussion here), I'm wondering why we have to bring back HashClusteredDistribution instead of just changing the behavior of HashPartitioning.satisfies according to the config. This config is also very similar to the existing spark.sql.requireAllClusterKeysForCoPartition.

@c21
Copy link
Contributor

c21 commented Feb 19, 2022

Hi all, I am actually wanted to propose an alternative fix for this issue in #35574 . At the high level, it achieves the same effect to allow requiring exact full cluster keys, but it does not need to introduce HashClusteredDistribution back, and it shall work for all operator requires ClusteredDistribution.

Could you guys help take a look and leave some comment? Thanks!

@c21
Copy link
Contributor

c21 commented Feb 19, 2022

the same problem also exists in
join(t1.x = t2.x) followed by window(t1.x, t1.y) or join(t1.x = t3.x and t1.y = t3.y)
Note that AQE doesn't have a chance to kick in because there's no shuffle between those operators.

@sigmod - I concur we don't have an existing way to work around this issue, if you cannot change the query (as you said for auto-generated query in reporting or visualization software). Although if you can change the query, you can add a DISTRIBUTE BY (x, y) between join and window to work around this.

I'd like to verify that there are alternatives end users can always leverage them for any cases. I agree DataFrame has no problem on this given existence of repartition(). But how about SQL statement? Do we provide the same for SQL statement? Could end users inject the hint anywhere before operator and it will take effect on the exact place?

@HeartSaVioR - beside the hint @sunchao mentioned in #35552 (comment), you can add a DISTRIBUTE BY (columns) SQL clause when you want to repartition in SQL query - https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-distribute-by.html .

is valid and a great point I totally missed. But then there would be another question, "why we do partial aggregate even users express the intention they want to do full shuffle because they indicate a skew?" I suspect we have to skip it.
If we go with skipping the partial aggregate altogether when end users give a hint (e.g. config, or better way if exists), would it work and would it be the acceptable solution for us?

I feel introducing more manually tuned config to allow user to disable partial aggregate, is not working at scale. I am actually in favor of query engine to adaptively make optimization under the hood, instead of leaving users to tune. I feel a better approach is to adaptively disable partial aggregate during runtime if reduction ratio is low - #28804 (comment) .

Let's expand this to the broader operators leveraging ClusteredDistribution. 3) doesn't even exist for them and we have a problem there as well. What would be the way to fix it if users indicate the issue and want to deal with?

I think the data skew can happen, and we should allow users to work around. But I feel introducing HashClusteredDistribution back is kind of an overkill, how about the approach in #35574 ? Would love more opinions on it.

@HeartSaVioR
Copy link
Contributor Author

I looked into #35574, and it looks good in overall. I'll close this. Thanks for making the proposal be better!

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Feb 20, 2022

I feel introducing more manually tuned config to allow user to disable partial aggregate, is not working at scale. I am actually in favor of query engine to adaptively make optimization under the hood, instead of leaving users to tune. I feel a better approach is to adaptively disable partial aggregate during runtime if reduction ratio is low - #28804 (comment) .

The config is basically assuming the case the query engine is not able to handle it smart. If users find out the output partitioning before aggregation has skews and it has to aggregate, in most case it is pretty clear that partial aggregate does not help. I even doubt we have to be adaptive for this case, unless the condition of being adaptive can be determined without requiring actual execution.

@c21
Copy link
Contributor

c21 commented Feb 20, 2022

I looked into #35574, and it looks good in overall. I'll close this. Thanks for making the proposal be better!

Thank you @HeartSaVioR for proposing the fix in the first place, and leading the discussion!

I even doubt we have to be adaptive for this case, unless the condition of being adaptive can be determined without requiring actual execution.

@HeartSaVioR - The config of skipping partial aggregate adaptively, is similar to Spark AQE, which can be enabled by default. For our company production environment, we actually enable the feature by default. So end user could potentially not notice the data skew at all, as Spark resolves it adaptively under the hood.

sunchao pushed a commit that referenced this pull request Feb 25, 2022
…clustering keys

### What changes were proposed in this pull request?

This PR is to allow`ClusteredDistribution` (such as operator with window, aggregate, etc) to require full clustering keys. Traditionally operator with `ClusteredDistribution` can be satisfied with `HashPartitioning` on subset of clustering keys. This behavior could potentially lead to data skewness (comments raised from #35552). Although we have various way to deal with the data skewness in this case, such as adding `repartition()`, disabling bucketing, adding custom AQE rule etc. There's still case we cannot handle e.g. data skewness in the same stage - (`join(t1.x = t2.x)` followed by `window(t1.x, t1.y)`). With the newly introduced config `spark.sql.requireAllClusterKeysForDistribution`.

### Why are the changes needed?

Allow users to work around data skewness issue when partitioned on subset of keys.

### Does this PR introduce _any_ user-facing change?

Yes, the added config, but disable by default.

### How was this patch tested?

Added unit test in `DataFrameWindowFunctionsSuite.scala` and `DistributionSuite.scala`

Closes #35574 from c21/exact-partition.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Chao Sun <sunchao@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants